1 /*
2 Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2014 EditShare, 2013-2017 Skytechnology sp. z o.o..
3
4 This file was part of MooseFS and is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20 #include "mount/readdata.h"
21
22 #include <errno.h>
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <sys/time.h>
30 #include <time.h>
31 #include <unistd.h>
32 #include <condition_variable>
33 #include <map>
34 #include <mutex>
35
36 #include "common/connection_pool.h"
37 #include "common/datapack.h"
38 #include "common/exceptions.h"
39 #include "common/mfserr.h"
40 #include "common/read_plan_executor.h"
41 #include "common/slogger.h"
42 #include "common/sockets.h"
43 #include "common/time_utils.h"
44 #include "mount/chunk_locator.h"
45 #include "mount/chunk_reader.h"
46 #include "mount/mastercomm.h"
47 #include "mount/readahead_adviser.h"
48 #include "mount/readdata_cache.h"
49 #include "mount/tweaks.h"
50 #include "protocol/MFSCommunication.h"
51
52 #define USECTICK 333333
53 #define REFRESHTICKS 15
54
55 #define MAPBITS 10
56 #define MAPSIZE (1<<(MAPBITS))
57 #define MAPMASK (MAPSIZE-1)
58 #define MAPINDX(inode) (inode&MAPMASK)
59
60 static std::atomic<uint32_t> gReadaheadMaxWindowSize;
61 static std::atomic<uint32_t> gCacheExpirationTime_ms;
62
63 struct readrec {
64 ChunkReader reader;
65 ReadCache cache;
66 ReadaheadAdviser readahead_adviser;
67 std::vector<uint8_t> read_buffer;
68 uint32_t inode;
69 uint8_t refreshCounter; // gMutex
70 bool expired; // gMutex
71 struct readrec *next; // gMutex
72 struct readrec *mapnext; // gMutex
73
readrecreadrec74 readrec(uint32_t inode, ChunkConnector& connector, double bandwidth_overuse)
75 : reader(connector, bandwidth_overuse),
76 cache(gCacheExpirationTime_ms),
77 readahead_adviser(gCacheExpirationTime_ms, gReadaheadMaxWindowSize),
78 inode(inode),
79 refreshCounter(0),
80 expired(false),
81 next(nullptr),
82 mapnext(nullptr) {
83 }
84 };
85
86 static ConnectionPool gReadConnectionPool;
87 static ChunkConnectorUsingPool gChunkConnector(gReadConnectionPool);
88 static std::mutex gMutex;
89 static readrec *rdinodemap[MAPSIZE];
90 static readrec *rdhead=NULL;
91 static pthread_t delayedOpsThread;
92 static std::atomic<uint32_t> gChunkserverConnectTimeout_ms;
93 static std::atomic<uint32_t> gChunkserverWaveReadTimeout_ms;
94 static std::atomic<uint32_t> gChunkserverTotalReadTimeout_ms;
95 static std::atomic<bool> gPrefetchXorStripes;
96 static bool readDataTerminate;
97 static std::atomic<uint32_t> maxRetries;
98 static double gBandwidthOveruse;
99
100 const unsigned ReadaheadAdviser::kInitWindowSize;
101 const unsigned ReadaheadAdviser::kDefaultWindowSizeLimit;
102 const int ReadaheadAdviser::kRandomThreshold;
103 const int ReadaheadAdviser::kHistoryEntryLifespan_ns;
104 const int ReadaheadAdviser::kHistoryCapacity;
105 const unsigned ReadaheadAdviser::kHistoryValidityThreshold;
106
read_data_get_wave_read_timeout_ms()107 uint32_t read_data_get_wave_read_timeout_ms() {
108 return gChunkserverWaveReadTimeout_ms;
109 }
110
read_data_get_connect_timeout_ms()111 uint32_t read_data_get_connect_timeout_ms() {
112 return gChunkserverConnectTimeout_ms;
113 }
114
read_data_get_total_read_timeout_ms()115 uint32_t read_data_get_total_read_timeout_ms() {
116 return gChunkserverTotalReadTimeout_ms;
117 }
118
read_data_get_prefetchxorstripes()119 bool read_data_get_prefetchxorstripes() {
120 return gPrefetchXorStripes;
121 }
122
read_data_delayed_ops(void * arg)123 void* read_data_delayed_ops(void *arg) {
124 readrec *rrec,**rrecp;
125 readrec **rrecmap;
126 (void)arg;
127 for (;;) {
128 gReadConnectionPool.cleanup();
129 std::unique_lock<std::mutex> lock(gMutex);
130 if (readDataTerminate) {
131 return NULL;
132 }
133 rrecp = &rdhead;
134 while ((rrec = *rrecp) != NULL) {
135 if (rrec->refreshCounter < REFRESHTICKS) {
136 rrec->refreshCounter++;
137 }
138 if (rrec->expired) {
139 *rrecp = rrec->next;
140 rrecmap = &(rdinodemap[MAPINDX(rrec->inode)]);
141 while (*rrecmap) {
142 if ((*rrecmap)==rrec) {
143 *rrecmap = rrec->mapnext;
144 } else {
145 rrecmap = &((*rrecmap)->mapnext);
146 }
147 }
148 delete rrec;
149 } else {
150 rrecp = &(rrec->next);
151 }
152 }
153 lock.unlock();
154 usleep(USECTICK);
155 }
156 }
157
read_data_new(uint32_t inode)158 void* read_data_new(uint32_t inode) {
159 readrec *rrec = new readrec(inode, gChunkConnector, gBandwidthOveruse);
160 std::unique_lock<std::mutex> lock(gMutex);
161 rrec->next = rdhead;
162 rdhead = rrec;
163 rrec->mapnext = rdinodemap[MAPINDX(inode)];
164 rdinodemap[MAPINDX(inode)] = rrec;
165 return rrec;
166 }
167
read_data_end(void * rr)168 void read_data_end(void* rr) {
169 readrec *rrec = (readrec*)rr;
170
171 std::unique_lock<std::mutex> lock(gMutex);
172 rrec->expired = true;
173 }
174
read_data_init(uint32_t retries,uint32_t chunkserverRoundTripTime_ms,uint32_t chunkserverConnectTimeout_ms,uint32_t chunkServerWaveReadTimeout_ms,uint32_t chunkserverTotalReadTimeout_ms,uint32_t cache_expiration_time_ms,uint32_t readahead_max_window_size_kB,bool prefetchXorStripes,double bandwidth_overuse)175 void read_data_init(uint32_t retries,
176 uint32_t chunkserverRoundTripTime_ms,
177 uint32_t chunkserverConnectTimeout_ms,
178 uint32_t chunkServerWaveReadTimeout_ms,
179 uint32_t chunkserverTotalReadTimeout_ms,
180 uint32_t cache_expiration_time_ms,
181 uint32_t readahead_max_window_size_kB,
182 bool prefetchXorStripes,
183 double bandwidth_overuse) {
184 uint32_t i;
185 pthread_attr_t thattr;
186
187 readDataTerminate = false;
188 for (i=0 ; i<MAPSIZE ; i++) {
189 rdinodemap[i]=NULL;
190 }
191 maxRetries=retries;
192 gChunkserverConnectTimeout_ms = chunkserverConnectTimeout_ms;
193 gChunkserverWaveReadTimeout_ms = chunkServerWaveReadTimeout_ms;
194 gChunkserverTotalReadTimeout_ms = chunkserverTotalReadTimeout_ms;
195 gCacheExpirationTime_ms = cache_expiration_time_ms;
196 gReadaheadMaxWindowSize = readahead_max_window_size_kB * 1024;
197 gPrefetchXorStripes = prefetchXorStripes;
198 gBandwidthOveruse = bandwidth_overuse;
199 gTweaks.registerVariable("PrefetchXorStripes", gPrefetchXorStripes);
200 gChunkConnector.setRoundTripTime(chunkserverRoundTripTime_ms);
201 gChunkConnector.setSourceIp(fs_getsrcip());
202 pthread_attr_init(&thattr);
203 pthread_attr_setstacksize(&thattr,0x100000);
204 pthread_create(&delayedOpsThread,&thattr,read_data_delayed_ops,NULL);
205 pthread_attr_destroy(&thattr);
206
207 gTweaks.registerVariable("ReadMaxRetries", maxRetries);
208 gTweaks.registerVariable("ReadConnectTimeout", gChunkserverConnectTimeout_ms);
209 gTweaks.registerVariable("ReadWaveTimeout", gChunkserverWaveReadTimeout_ms);
210 gTweaks.registerVariable("ReadTotalTimeout", gChunkserverTotalReadTimeout_ms);
211 gTweaks.registerVariable("CacheExpirationTime", gCacheExpirationTime_ms);
212 gTweaks.registerVariable("ReadaheadMaxWindowSize", gReadaheadMaxWindowSize);
213 gTweaks.registerVariable("ReadChunkPrepare", ChunkReader::preparations);
214 gTweaks.registerVariable("ReqExecutedTotal", ReadPlanExecutor::executions_total_);
215 gTweaks.registerVariable("ReqExecutedUsingAll", ReadPlanExecutor::executions_with_additional_operations_);
216 gTweaks.registerVariable("ReqFinishedUsingAll", ReadPlanExecutor::executions_finished_by_additional_operations_);
217 }
218
read_data_term(void)219 void read_data_term(void) {
220 readrec *rr,*rrn;
221
222 {
223 std::unique_lock<std::mutex> lock(gMutex);
224 readDataTerminate = true;
225 }
226
227 pthread_join(delayedOpsThread,NULL);
228 for (rr = rdhead ; rr ; rr = rrn) {
229 rrn = rr->next;
230 delete rr;
231 }
232 for (auto& rr : rdinodemap) {
233 rr = NULL;
234 }
235 rdhead = NULL;
236 }
237
read_inode_ops(uint32_t inode)238 void read_inode_ops(uint32_t inode) { // attributes of inode have been changed - force reconnect and clear cache
239 readrec *rrec;
240 std::unique_lock<std::mutex> lock(gMutex);
241 for (rrec = rdinodemap[MAPINDX(inode)] ; rrec ; rrec=rrec->mapnext) {
242 if (rrec->inode == inode) {
243 rrec->refreshCounter = REFRESHTICKS; // force reconnect on forthcoming access
244 }
245 }
246 }
247
read_data_sleep_time_ms(int tryCounter)248 int read_data_sleep_time_ms(int tryCounter) {
249 if (tryCounter <= 13) { // 2^13 = 8192
250 return (1 << tryCounter); // 2^tryCounter milliseconds
251 } else {
252 return 1000 * 10; // 10 seconds
253 }
254 }
255
print_error_msg(const readrec * rrec,uint32_t try_counter,const Exception & ex)256 static void print_error_msg(const readrec *rrec, uint32_t try_counter, const Exception &ex) {
257 if (rrec->reader.isChunkLocated()) {
258 lzfs_pretty_syslog(LOG_WARNING,
259 "read file error, inode: %u, index: %u, chunk: %lu, version: %u - %s "
260 "(try counter: %u)", rrec->reader.inode(), rrec->reader.index(),
261 rrec->reader.chunkId(), rrec->reader.version(), ex.what(), try_counter);
262 } else {
263 lzfs_pretty_syslog(LOG_WARNING,
264 "read file error, inode: %u, index: %u, chunk: failed to locate - %s "
265 "(try counter: %u)", rrec->reader.inode(), rrec->reader.index(),
266 ex.what(), try_counter);
267 }
268 }
269
read_to_buffer(readrec * rrec,uint64_t current_offset,uint64_t bytes_to_read,std::vector<uint8_t> & read_buffer,uint64_t * bytes_read)270 static int read_to_buffer(readrec *rrec, uint64_t current_offset, uint64_t bytes_to_read,
271 std::vector<uint8_t> &read_buffer, uint64_t *bytes_read) {
272 uint32_t try_counter = 0;
273 uint32_t prepared_inode = 0; // this is always different than any real inode
274 uint32_t prepared_chunk_id = 0;
275 assert(*bytes_read == 0);
276
277 // forced sleep between retries caused by recoverable failures
278 uint32_t sleep_time_ms = 0;
279
280 std::unique_lock<std::mutex> lock(gMutex);
281 bool force_prepare = (rrec->refreshCounter == REFRESHTICKS);
282 lock.unlock();
283
284 while (bytes_to_read > 0) {
285 Timeout sleep_timeout = Timeout(std::chrono::milliseconds(sleep_time_ms));
286 // Increase communicationTimeout to sleepTime; longer poll() can't be worse
287 // than short poll() followed by nonproductive usleep().
288 uint32_t timeout_ms = std::max(gChunkserverTotalReadTimeout_ms.load(), sleep_time_ms);
289 Timeout communication_timeout = Timeout(std::chrono::milliseconds(timeout_ms));
290 sleep_time_ms = 0;
291 try {
292 uint32_t chunk_id = current_offset / MFSCHUNKSIZE;
293 if (force_prepare || prepared_inode != rrec->inode || prepared_chunk_id != chunk_id) {
294 rrec->reader.prepareReadingChunk(rrec->inode, chunk_id, force_prepare);
295 prepared_chunk_id = chunk_id;
296 prepared_inode = rrec->inode;
297 force_prepare = false;
298 lock.lock();
299 rrec->refreshCounter = 0;
300 lock.unlock();
301 }
302
303 uint64_t offset_of_chunk = static_cast<uint64_t>(chunk_id) * MFSCHUNKSIZE;
304 uint32_t offset_in_chunk = current_offset - offset_of_chunk;
305 uint32_t size_in_chunk = MFSCHUNKSIZE - offset_in_chunk;
306 if (size_in_chunk > bytes_to_read) {
307 size_in_chunk = bytes_to_read;
308 }
309 uint32_t bytes_read_from_chunk = rrec->reader.readData(
310 read_buffer, offset_in_chunk, size_in_chunk,
311 gChunkserverConnectTimeout_ms, gChunkserverWaveReadTimeout_ms,
312 communication_timeout, gPrefetchXorStripes);
313 // No exceptions thrown. We can increase the counters and go to the next chunk
314 *bytes_read += bytes_read_from_chunk;
315 current_offset += bytes_read_from_chunk;
316 bytes_to_read -= bytes_read_from_chunk;
317 if (bytes_read_from_chunk < size_in_chunk) {
318 // end of file
319 break;
320 }
321 try_counter = 0;
322 } catch (UnrecoverableReadException &ex) {
323 print_error_msg(rrec, try_counter, ex);
324 if (ex.status() == LIZARDFS_ERROR_ENOENT) {
325 return LIZARDFS_ERROR_EBADF; // stale handle
326 } else {
327 return LIZARDFS_ERROR_IO;
328 }
329 } catch (Exception &ex) {
330 if (try_counter > 0) {
331 print_error_msg(rrec, try_counter, ex);
332 }
333 force_prepare = true;
334 if (try_counter > maxRetries) {
335 return LIZARDFS_ERROR_IO;
336 } else {
337 usleep(sleep_timeout.remaining_us());
338 sleep_time_ms = read_data_sleep_time_ms(try_counter);
339 }
340 try_counter++;
341 }
342 }
343 return LIZARDFS_STATUS_OK;
344 }
345
read_data(void * rr,uint64_t offset,uint32_t size,ReadCache::Result & ret)346 int read_data(void *rr, uint64_t offset, uint32_t size, ReadCache::Result &ret) {
347 readrec *rrec = (readrec*)rr;
348 assert(size % MFSBLOCKSIZE == 0);
349 assert(offset % MFSBLOCKSIZE == 0);
350
351 if (size == 0) {
352 return LIZARDFS_STATUS_OK;
353 }
354
355 rrec->readahead_adviser.feed(offset, size);
356
357 ReadCache::Result result = rrec->cache.query(offset, size);
358
359 if (result.frontOffset() <= offset && offset + size <= result.endOffset()) {
360 ret = std::move(result);
361 return LIZARDFS_STATUS_OK;
362 }
363 uint64_t request_offset = result.remainingOffset();
364 uint64_t bytes_to_read_left = std::max<uint64_t>(size, rrec->readahead_adviser.window()) - (request_offset - offset);
365 bytes_to_read_left = (bytes_to_read_left + MFSBLOCKSIZE - 1) / MFSBLOCKSIZE * MFSBLOCKSIZE;
366
367 uint64_t bytes_read = 0;
368 int err = read_to_buffer(rrec, request_offset, bytes_to_read_left, result.inputBuffer(), &bytes_read);
369 if (err) {
370 // paranoia check - discard any leftover bytes from incorrect read
371 result.inputBuffer().clear();
372 return err;
373 }
374
375 ret = std::move(result);
376 return LIZARDFS_STATUS_OK;
377 }
378