1 /*
2 Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2017 Skytechnology sp. z o.o..
3
4 This file was part of MooseFS and is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20 #include "mount/writedata.h"
21
22 #include <errno.h>
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <sys/time.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 #include <algorithm>
33 #include <condition_variable>
34 #include <mutex>
35 #include <vector>
36
37 #include "common/chunk_connector.h"
38
39 #include "common/crc.h"
40 #include "common/datapack.h"
41 #include "common/exceptions.h"
42 #include "common/goal.h"
43 #include "common/massert.h"
44 #include "common/message_receive_buffer.h"
45 #include "common/mfserr.h"
46 #include "common/multi_buffer_writer.h"
47 #include "common/pcqueue.h"
48 #include "common/slice_traits.h"
49 #include "common/slogger.h"
50 #include "common/sockets.h"
51 #include "common/time_utils.h"
52 #include "devtools/request_log.h"
53 #include "mount/chunk_writer.h"
54 #include "mount/global_chunkserver_stats.h"
55 #include "mount/mastercomm.h"
56 #include "mount/readdata.h"
57 #include "mount/tweaks.h"
58 #include "mount/write_cache_block.h"
59 #include "protocol/cltocs.h"
60 #include "protocol/MFSCommunication.h"
61
62 #define IDLE_CONNECTION_TIMEOUT 6
63 #define IDHASHSIZE 256
64 #define IDHASH(inode) (((inode)*0xB239FB71)%IDHASHSIZE)
65
66 namespace {
67
68 struct inodedata {
69 uint32_t inode;
70 uint64_t maxfleng;
71 int status;
72 uint16_t flushwaiting;
73 uint16_t writewaiting;
74 uint16_t lcnt;
75 uint32_t trycnt;
76 bool inqueue; // true it this inode is waiting in one of the queues or is being processed
77 uint32_t minimumBlocksToWrite;
78 std::list<WriteCacheBlock> dataChain;
79 int alterations_in_chain; // number of adherent blocks with different chunk ids in chain
80 std::condition_variable flushcond; // wait for !inqueue (flush)
81 std::condition_variable writecond; // wait for flushwaiting==0 (write)
82 inodedata *next;
83 std::unique_ptr<WriteChunkLocator> locator;
84 int newDataInChainPipe[2];
85 bool workerWaitingForData;
86 Timer lastWriteToDataChain;
87 Timer lastWriteToChunkservers;
88
inodedata__anone68df2a60111::inodedata89 inodedata(uint32_t inode)
90 : inode(inode),
91 maxfleng(0),
92 status(LIZARDFS_STATUS_OK),
93 flushwaiting(0),
94 writewaiting(0),
95 lcnt(0),
96 trycnt(0),
97 inqueue(false),
98 minimumBlocksToWrite(1),
99 alterations_in_chain(),
100 next(nullptr),
101 workerWaitingForData(false) {
102 #ifdef _WIN32
103 // We don't use inodeData->waitingworker and inodeData->pipe on Cygwin because
104 // Cygwin's implementation of mixed socket & pipe polling is very inefficient.
105 // On mingw platform pipes are unavailable.
106 newDataInChainPipe[0] = newDataInChainPipe[1] = -1;
107 #else
108 if (pipe(newDataInChainPipe) < 0) {
109 lzfs_pretty_syslog(LOG_WARNING, "creating pipe error: %s", strerr(errno));
110 newDataInChainPipe[0] = -1;
111 }
112 #endif
113 }
114
~inodedata__anone68df2a60111::inodedata115 ~inodedata() {
116 if (isDataChainPipeValid()) {
117 close(newDataInChainPipe[0]);
118 close(newDataInChainPipe[1]);
119 }
120 }
121
122 /* glock: LOCKED */
wakeUpWorkerIfNecessary__anone68df2a60111::inodedata123 void wakeUpWorkerIfNecessary() {
124 /*
125 * Write worker always looks for the first block in chain and we modify or add always the
126 * last block in chain so it is necessary to wake up write worker only if the first block
127 * is the last one, ie. dataChain.size() == 1.
128 */
129 if (workerWaitingForData && dataChain.size() == 1 && isDataChainPipeValid()) {
130 if (write(newDataInChainPipe[1], " ", 1) != 1) {
131 lzfs_pretty_syslog(LOG_ERR, "write pipe error: %s", strerr(errno));
132 }
133 workerWaitingForData = false;
134 }
135 }
136
137 /* glock: UNUSED */
isDataChainPipeValid__anone68df2a60111::inodedata138 bool isDataChainPipeValid() const {
139 return newDataInChainPipe[0] >= 0;
140 }
141
142 /*! Check if inode requires flushing all its data chain to chunkservers.
143 *
144 * Returns true if anyone requested to flush the data by calling write_data_flush
145 * or write_data_flush_inode or the data in data chain is too old to keep it longer in
146 * our buffers. If this function returns false, we write only full stripes from data
147 * chain to chunkservers.
148 * glock: LOCKED
149 */
requiresFlushing__anone68df2a60111::inodedata150 bool requiresFlushing() const {
151 return (flushwaiting > 0
152 || lastWriteToDataChain.elapsed_ms() >= kMaximumTimeInDataChainSinceLastWrite_ms
153 || lastWriteToChunkservers.elapsed_ms() >= kMaximumTimeInDataChainSinceLastFlush_ms);
154 }
155
pushToChain__anone68df2a60111::inodedata156 void pushToChain(WriteCacheBlock &&block) {
157 dataChain.push_back(std::move(block));
158 if (dataChain.size() > 1 && dataChain.back().chunkIndex != std::next(dataChain.rbegin())->chunkIndex) {
159 alterations_in_chain++;
160 }
161 }
162
popFromChain__anone68df2a60111::inodedata163 void popFromChain() {
164 assert(dataChain.size() > 0);
165 if (dataChain.size() > 1 && dataChain.front().chunkIndex != std::next(dataChain.begin())->chunkIndex) {
166 alterations_in_chain--;
167 }
168 dataChain.pop_front();
169 }
170
registerAlterationsInChain__anone68df2a60111::inodedata171 void registerAlterationsInChain(int delta) {
172 alterations_in_chain += delta;
173 }
174
hasMultipleChunkIdsInChain__anone68df2a60111::inodedata175 bool hasMultipleChunkIdsInChain() const {
176 return alterations_in_chain > 0;
177 }
178
179 private:
180 /*! Limit for \p lastWriteToChunkservers after which we force a flush.
181 *
182 * Maximum time for data to be kept in data chain waiting for collecting a full stripe.
183 */
184 static const uint32_t kMaximumTimeInDataChainSinceLastFlush_ms = 15000;
185
186 /*! Limit for \p lastWriteToDataChain after which we force a flush.
187 *
188 * Maximum time for data to be kept in data chain waiting for collecting a full stripe
189 * if no new data is written into the data chain
190 */
191 static const uint32_t kMaximumTimeInDataChainSinceLastWrite_ms = 5000;
192 };
193
194 struct DelayedQueueEntry {
195 inodedata *inodeData;
196 int32_t ticksLeft;
197 static constexpr int kTicksPerSecond = 10;
198
DelayedQueueEntry__anone68df2a60111::DelayedQueueEntry199 DelayedQueueEntry(inodedata *inodeData, int32_t ticksLeft)
200 : inodeData(inodeData),
201 ticksLeft(ticksLeft) {
202 }
203 };
204
205 } // anonymous namespace
206
207 static std::atomic<uint32_t> maxretries;
208 static std::mutex gMutex;
209 typedef std::unique_lock<std::mutex> Glock;
210
211 static std::condition_variable fcbcond;
212 static uint32_t fcbwaiting = 0;
213 static int64_t freecacheblocks;
214 static inodedata **idhash;
215
216 static uint32_t gWriteWindowSize;
217 static uint32_t gChunkserverTimeout_ms;
218
219 // percentage of the free cache (1% - 100%) which can be used by one inode
220 static uint32_t gCachePerInodePercentage;
221
222 static pthread_t delayed_queue_worker_th;
223 static std::vector<pthread_t> write_worker_th;
224
225 static void* jqueue;
226 static std::list<DelayedQueueEntry> delayedQueue;
227
228 static ConnectionPool gChunkserverConnectionPool;
229 static ChunkConnectorUsingPool gChunkConnector(gChunkserverConnectionPool);
230
write_cb_release_blocks(uint32_t count,Glock &)231 void write_cb_release_blocks(uint32_t count, Glock&) {
232 freecacheblocks += count;
233 if (fcbwaiting > 0 && freecacheblocks > 0) {
234 fcbcond.notify_all();
235 }
236 }
237
write_cb_acquire_blocks(uint32_t count,Glock &)238 void write_cb_acquire_blocks(uint32_t count, Glock&) {
239 freecacheblocks -= count;
240 }
241
write_cb_wait_for_block(inodedata * id,Glock & glock)242 void write_cb_wait_for_block(inodedata* id, Glock& glock) {
243 LOG_AVG_TILL_END_OF_SCOPE0("write_cb_wait_for_block");
244 fcbwaiting++;
245 uint64_t dataChainSize = id->dataChain.size();
246 while (freecacheblocks <= 0
247 // dataChainSize / (dataChainSize + freecacheblocks) > gCachePerInodePercentage / 100
248 // really means "0 > 0"
249 || dataChainSize * 100 > (dataChainSize + freecacheblocks) * gCachePerInodePercentage)
250 {
251 fcbcond.wait(glock);
252 }
253 fcbwaiting--;
254 }
255
256 /* inode */
257
write_find_inodedata(uint32_t inode,Glock &)258 inodedata* write_find_inodedata(uint32_t inode, Glock&) {
259 uint32_t idh = IDHASH(inode);
260 for (inodedata* id = idhash[idh]; id; id = id->next) {
261 if (id->inode == inode) {
262 return id;
263 }
264 }
265 return NULL;
266 }
267
write_get_inodedata(uint32_t inode,Glock &)268 inodedata* write_get_inodedata(uint32_t inode, Glock&) {
269 uint32_t idh = IDHASH(inode);
270 inodedata* id;
271 for (inodedata* id = idhash[idh]; id; id = id->next) {
272 if (id->inode == inode) {
273 return id;
274 }
275 }
276 id = new inodedata(inode);
277 id->next = idhash[idh];
278 idhash[idh] = id;
279 return id;
280 }
281
write_free_inodedata(inodedata * fid,Glock &)282 void write_free_inodedata(inodedata* fid, Glock&) {
283 uint32_t idh = IDHASH(fid->inode);
284 inodedata *id, **idp;
285 idp = &(idhash[idh]);
286 while ((id = *idp)) {
287 if (id == fid) {
288 *idp = id->next;
289 delete id;
290 return;
291 }
292 idp = &(id->next);
293 }
294 }
295
296 /* delayed queue */
297
delayed_queue_put(inodedata * id,uint32_t seconds,Glock &)298 static void delayed_queue_put(inodedata* id, uint32_t seconds, Glock&) {
299 delayedQueue.push_back(DelayedQueueEntry(id, seconds * DelayedQueueEntry::kTicksPerSecond));
300 }
301
delayed_queue_remove(inodedata * id,Glock &)302 static bool delayed_queue_remove(inodedata* id, Glock&) {
303 for (auto it = delayedQueue.begin(); it != delayedQueue.end(); ++it) {
304 if (it->inodeData == id) {
305 delayedQueue.erase(it);
306 return true;
307 }
308 }
309 return false;
310 }
311
delayed_queue_worker(void *)312 void* delayed_queue_worker(void*) {
313 for (;;) {
314 Timeout timeout(std::chrono::microseconds(1000000 / DelayedQueueEntry::kTicksPerSecond));
315 Glock lock(gMutex);
316 auto it = delayedQueue.begin();
317 while (it != delayedQueue.end()) {
318 if (it->inodeData == NULL) {
319 return NULL;
320 }
321 if (--it->ticksLeft <= 0) {
322 queue_put(jqueue, 0, 0, reinterpret_cast<uint8_t*>(it->inodeData), 0);
323 it = delayedQueue.erase(it);
324 } else {
325 ++it;
326 }
327 }
328 lock.unlock();
329 usleep(timeout.remaining_us());
330 }
331 return NULL;
332 }
333
334 /* queues */
335
write_delayed_enqueue(inodedata * id,uint32_t seconds,Glock & lock)336 void write_delayed_enqueue(inodedata* id, uint32_t seconds, Glock& lock) {
337 if (seconds > 0) {
338 delayed_queue_put(id, seconds, lock);
339 } else {
340 queue_put(jqueue, 0, 0, (uint8_t*) id, 0);
341 }
342 }
343
write_enqueue(inodedata * id,Glock &)344 void write_enqueue(inodedata* id, Glock&) {
345 queue_put(jqueue, 0, 0, (uint8_t*) id, 0);
346 }
347
write_job_delayed_end(inodedata * id,int status,int seconds,Glock & lock)348 void write_job_delayed_end(inodedata* id, int status, int seconds, Glock &lock) {
349 LOG_AVG_TILL_END_OF_SCOPE0("write_job_delayed_end");
350 LOG_AVG_TILL_END_OF_SCOPE1("write_job_delayed_end#sec", seconds);
351 id->locator.reset();
352 if (status != LIZARDFS_STATUS_OK) {
353 lzfs_pretty_syslog(LOG_WARNING, "error writing file number %" PRIu32 ": %s", id->inode, lizardfs_error_string(status));
354 id->status = status;
355 }
356 status = id->status;
357 if (id->requiresFlushing() > 0) {
358 // Don't sleep if we have to write all the data immediately
359 seconds = 0;
360 }
361 if (!id->dataChain.empty() && status == LIZARDFS_STATUS_OK) { // still have some work to do
362 id->trycnt = 0; // on good write reset try counter
363 write_delayed_enqueue(id, seconds, lock);
364 } else { // no more work or error occurred
365 // if this is an error then release all data blocks
366 write_cb_release_blocks(id->dataChain.size(), lock);
367 id->dataChain.clear();
368 id->inqueue = false;
369 id->maxfleng = 0; // proper file length is now on the master server, remove our length cache
370 if (id->flushwaiting > 0) {
371 id->flushcond.notify_all();
372 }
373 }
374 }
375
write_job_end(inodedata * id,int status,Glock & lock)376 void write_job_end(inodedata *id, int status, Glock &lock) {
377 write_job_delayed_end(id, status, 0, lock);
378 }
379
380 class InodeChunkWriter {
381 public:
InodeChunkWriter()382 InodeChunkWriter() : inodeData_(nullptr), chunkIndex_(0) {}
383 void processJob(inodedata* data);
384
385 private:
386 void processDataChain(ChunkWriter& writer);
387 void returnJournalToDataChain(std::list<WriteCacheBlock>&& journal, Glock&);
388 bool haveAnyBlockInCurrentChunk(Glock&);
389 bool haveBlockWorthWriting(uint32_t unfinishedOperationCount, Glock&);
390 inodedata* inodeData_;
391 uint32_t chunkIndex_;
392 Timer wholeOperationTimer;
393
394 // Maximum time of writing one chunk
395 static const uint32_t kMaximumTime = 30;
396 static const uint32_t kMaximumTimeWhenJobsWaiting = 10;
397 // For the last 'kTimeToFinishOperations' seconds of maximumTime we won't start new operations
398 static const uint32_t kTimeToFinishOperations = 5;
399 };
400
processJob(inodedata * inodeData)401 void InodeChunkWriter::processJob(inodedata* inodeData) {
402 LOG_AVG_TILL_END_OF_SCOPE0("InodeChunkWriter::processJob");
403 inodeData_ = inodeData;
404
405 // First, choose index of some chunk to write
406 Glock lock(gMutex);
407 int status = inodeData_->status;
408 bool haveDataToWrite;
409 if (inodeData_->locator) {
410 // There is a chunk lock left by a previous unfinished job -- let's finish it!
411 chunkIndex_ = inodeData_->locator->chunkIndex();
412 haveDataToWrite = haveAnyBlockInCurrentChunk(lock);
413 } else if (!inodeData_->dataChain.empty()) {
414 // There is no unfinished job, but there is some data to write -- let's start a new job
415 chunkIndex_ = inodeData_->dataChain.front().chunkIndex;
416 haveDataToWrite = true;
417 } else {
418 // No data, no unfinished jobs -- something wrong!
419 // This should never happen, so the status doesn't really matter
420 lzfs_pretty_syslog(LOG_WARNING, "got inode with no data to write!!!");
421 haveDataToWrite = false;
422 status = LIZARDFS_ERROR_EINVAL;
423 }
424 if (status != LIZARDFS_STATUS_OK) {
425 write_job_end(inodeData_, status, lock);
426 return;
427 }
428 lock.unlock();
429
430 /* Process the job */
431 ChunkWriter writer(globalChunkserverStats, gChunkConnector, inodeData_->newDataInChainPipe[0]);
432 wholeOperationTimer.reset();
433 std::unique_ptr<WriteChunkLocator> locator = std::move(inodeData_->locator);
434 if (!locator) {
435 locator.reset(new WriteChunkLocator());
436 }
437
438 try {
439 try {
440 locator->locateAndLockChunk(inodeData_->inode, chunkIndex_);
441
442 // Optimization -- talk with chunkservers only if we have to write any data.
443 // Don't do this if we just have to release some previously unlocked lock.
444 if (haveDataToWrite) {
445 writer.init(locator.get(), gChunkserverTimeout_ms);
446 processDataChain(writer);
447 writer.finish(kTimeToFinishOperations * 1000);
448
449 Glock lock(gMutex);
450 returnJournalToDataChain(writer.releaseJournal(), lock);
451 }
452 locator->unlockChunk();
453 read_inode_ops(inodeData_->inode);
454
455 Glock lock(gMutex);
456 inodeData_->minimumBlocksToWrite = writer.getMinimumBlockCountWorthWriting();
457 bool canWait = !inodeData_->requiresFlushing();
458 if (!haveAnyBlockInCurrentChunk(lock)) {
459 // There is no need to wait if we have just finished writing some chunk.
460 // Let's immediately start writing the next chunk (if there is any).
461 canWait = false;
462 }
463 if (inodeData_->hasMultipleChunkIdsInChain()) {
464 // Don't wait if there is more than one chunk in the data chain -- the first chunk
465 // has to be flushed, because no more data will be added to it
466 canWait = false;
467 }
468 write_job_delayed_end(inodeData_, LIZARDFS_STATUS_OK, (canWait ? 1 : 0), lock);
469 } catch (Exception& e) {
470 std::string errorString = e.what();
471 Glock lock(gMutex);
472 if (e.status() != LIZARDFS_ERROR_LOCKED) {
473 inodeData_->trycnt++;
474 errorString += " (try counter: " + std::to_string(inodeData->trycnt) + ")";
475 } else if (inodeData_->trycnt == 0) {
476 // Set to nonzero to inform writers, that this task needs to wait a bit
477 // Don't increase -- LIZARDFS_ERROR_LOCKED means that chunk is locked by a different client
478 // and we have to wait until it is unlocked
479 inodeData_->trycnt = 1;
480 }
481 // Keep the lock
482 inodeData_->locator = std::move(locator);
483 // Move data left in the journal into front of the write cache
484 returnJournalToDataChain(writer.releaseJournal(), lock);
485 lock.unlock();
486
487 lzfs_pretty_syslog(LOG_WARNING, "write file error, inode: %" PRIu32 ", index: %" PRIu32 " - %s",
488 inodeData_->inode, chunkIndex_, errorString.c_str());
489 if (inodeData_->trycnt >= maxretries) {
490 // Convert error to an unrecoverable error
491 throw UnrecoverableWriteException(e.message(), e.status());
492 } else {
493 // This may be recoverable or unrecoverable error
494 throw;
495 }
496 }
497 } catch (UnrecoverableWriteException& e) {
498 Glock lock(gMutex);
499 if (e.status() == LIZARDFS_ERROR_ENOENT) {
500 write_job_end(inodeData_, LIZARDFS_ERROR_EBADF, lock);
501 } else if (e.status() == LIZARDFS_ERROR_QUOTA) {
502 write_job_end(inodeData_, LIZARDFS_ERROR_QUOTA, lock);
503 } else if (e.status() == LIZARDFS_ERROR_NOSPACE || e.status() == LIZARDFS_ERROR_NOCHUNKSERVERS) {
504 write_job_end(inodeData_, LIZARDFS_ERROR_NOSPACE, lock);
505 } else {
506 write_job_end(inodeData_, LIZARDFS_ERROR_IO, lock);
507 }
508 } catch (Exception& e) {
509 Glock lock(gMutex);
510 int waitTime = 1;
511 if (inodeData_->trycnt > 10) {
512 waitTime = std::min<int>(10, inodeData_->trycnt - 9);
513 }
514 write_delayed_enqueue(inodeData_, waitTime, lock);
515 }
516 }
517
processDataChain(ChunkWriter & writer)518 void InodeChunkWriter::processDataChain(ChunkWriter& writer) {
519 LOG_AVG_TILL_END_OF_SCOPE0("InodeChunkWriter::processDataChain");
520 uint32_t maximumTime = kMaximumTime;
521 bool otherJobsAreWaiting = false;
522 while (true) {
523 bool newOtherJobsAreWaiting = !queue_isempty(jqueue);
524 if (!otherJobsAreWaiting && newOtherJobsAreWaiting) {
525 // Some new jobs have just arrived in the queue -- we should finish faster.
526 maximumTime = kMaximumTimeWhenJobsWaiting;
527 // But we need at least 5 seconds to finish the operations that are in progress
528 uint32_t elapsedSeconds = wholeOperationTimer.elapsed_s();
529 if (elapsedSeconds + kTimeToFinishOperations >= maximumTime) {
530 maximumTime = elapsedSeconds + kTimeToFinishOperations;
531 }
532 }
533 otherJobsAreWaiting = newOtherJobsAreWaiting;
534
535 // If we have sent the previous message and have some time left, we can take
536 // another block from current chunk to process it simultaneously. We won't take anything
537 // new if we've already sent 'gWriteWindowSize' blocks and didn't receive status from
538 // the chunkserver.
539 bool can_expect_next_block = true;
540 if (wholeOperationTimer.elapsed_s() + kTimeToFinishOperations < maximumTime
541 && writer.acceptsNewOperations()) {
542 Glock lock(gMutex);
543 // While there is any block worth sending, we add new write operation
544 while (haveBlockWorthWriting(writer.getUnfinishedOperationsCount(), lock)) {
545 // Remove block from cache and pass it to the writer
546 writer.addOperation(std::move(inodeData_->dataChain.front()));
547 inodeData_->popFromChain();
548 write_cb_release_blocks(1, lock);
549 }
550 if (inodeData_->requiresFlushing() && !haveAnyBlockInCurrentChunk(lock)) {
551 // No more data and some flushing is needed or required, so flush everything
552 writer.startFlushMode();
553 }
554 if (writer.getUnfinishedOperationsCount() < gWriteWindowSize) {
555 inodeData_->workerWaitingForData = true;
556 }
557 can_expect_next_block = haveAnyBlockInCurrentChunk(lock);
558 } else if (writer.acceptsNewOperations()) {
559 // We are running out of time...
560 Glock lock(gMutex);
561 if (!inodeData_->requiresFlushing()) {
562 // Nobody is waiting for the data to be flushed and the data in write chain
563 // isn't too old. Let's postpone any operations
564 // that didn't start yet and finish them in the next time slice for this chunk
565 writer.dropNewOperations();
566 } else {
567 // Somebody if waiting for a flush, so we have to finish writing everything.
568 writer.startFlushMode();
569 }
570 can_expect_next_block = haveAnyBlockInCurrentChunk(lock);
571 }
572
573 if (writer.startNewOperations(can_expect_next_block) > 0) {
574 Glock lock(gMutex);
575 inodeData_->lastWriteToChunkservers.reset();
576 }
577 if (writer.getPendingOperationsCount() == 0) {
578 return;
579 } else if (wholeOperationTimer.elapsed_s() >= maximumTime) {
580 throw RecoverableWriteException(
581 "Timeout after " + std::to_string(wholeOperationTimer.elapsed_ms()) + " ms",
582 LIZARDFS_ERROR_TIMEOUT);
583 }
584
585 // Let's sleep a bit shorter if we can't be woken up by the pipe to reduce the latency
586 writer.processOperations(inodeData_->isDataChainPipeValid() ? 50 : 10);
587 }
588 }
589
returnJournalToDataChain(std::list<WriteCacheBlock> && journal,Glock & lock)590 void InodeChunkWriter::returnJournalToDataChain(std::list<WriteCacheBlock> &&journal, Glock &lock) {
591 if (!journal.empty()) {
592 write_cb_acquire_blocks(journal.size(), lock);
593 uint64_t prev_id = journal.front().chunkIndex;
594 int alterations = (!inodeData_->dataChain.empty()
595 && journal.back().chunkIndex != inodeData_->dataChain.front().chunkIndex) ? 1 : 0;
596 for (auto it = std::next(journal.begin()); it != journal.end(); ++it) {
597 if (it->chunkIndex != prev_id) {
598 alterations++;
599 prev_id = it->chunkIndex;
600 }
601 }
602 inodeData_->dataChain.splice(inodeData_->dataChain.begin(), std::move(journal));
603 inodeData_->registerAlterationsInChain(alterations);
604 }
605 }
606
607 /*
608 * Check if there is any data in the same chunk waiting to be written.
609 */
haveAnyBlockInCurrentChunk(Glock &)610 bool InodeChunkWriter::haveAnyBlockInCurrentChunk(Glock&) {
611 if (inodeData_->dataChain.empty()) {
612 return false;
613 } else {
614 return inodeData_->dataChain.front().chunkIndex == chunkIndex_;
615 }
616 }
617
618 /*
619 * Check if there is any data worth sending to the chunkserver.
620 * We will avoid sending blocks of size different than MFSBLOCKSIZE.
621 * These can be taken only if we are close to run out of tasks to do.
622 * glock: LOCKED
623 */
haveBlockWorthWriting(uint32_t unfinishedOperationCount,Glock & lock)624 bool InodeChunkWriter::haveBlockWorthWriting(uint32_t unfinishedOperationCount, Glock& lock) {
625 if (!haveAnyBlockInCurrentChunk(lock)) {
626 return false;
627 }
628 const auto& block = inodeData_->dataChain.front();
629 if (block.type != WriteCacheBlock::kWritableBlock) {
630 // Always write data, that was previously written
631 return true;
632 } else if (unfinishedOperationCount >= gWriteWindowSize) {
633 // Don't start new operations if there is already a lot of pending writes
634 return false;
635 } else {
636 // Always start full blocks; start partial blocks only if we have to flush the data
637 // or the block won't be expanded (only the last one can be) to a full block
638 return (block.size() == MFSBLOCKSIZE
639 || inodeData_->requiresFlushing()
640 || inodeData_->dataChain.size() > 1);
641 }
642 }
643
644 /* main working thread | glock:UNLOCKED */
write_worker(void *)645 void* write_worker(void*) {
646 InodeChunkWriter inodeDataWriter;
647 for (;;) {
648 // get next job
649 uint32_t z1, z2, z3;
650 uint8_t *data;
651 {
652 LOG_AVG_TILL_END_OF_SCOPE0("write_worker#idle");
653 queue_get(jqueue, &z1, &z2, &data, &z3);
654 }
655 if (data == NULL) {
656 return NULL;
657 }
658
659 // process the job
660 LOG_AVG_TILL_END_OF_SCOPE0("write_worker#working");
661 inodeDataWriter.processJob((inodedata*) data);
662 }
663 return NULL;
664 }
665
666 /* API | glock: INITIALIZED,UNLOCKED */
write_data_init(uint32_t cachesize,uint32_t retries,uint32_t workers,uint32_t writewindowsize,uint32_t chunkserverTimeout_ms,uint32_t cachePerInodePercentage)667 void write_data_init(uint32_t cachesize, uint32_t retries, uint32_t workers,
668 uint32_t writewindowsize, uint32_t chunkserverTimeout_ms, uint32_t cachePerInodePercentage) {
669 uint64_t cachebytecount = uint64_t(cachesize) * 1024 * 1024;
670 uint64_t cacheblockcount = (cachebytecount / MFSBLOCKSIZE);
671 uint32_t i;
672 pthread_attr_t thattr;
673
674 gChunkConnector.setSourceIp(fs_getsrcip());
675 gWriteWindowSize = writewindowsize;
676 gChunkserverTimeout_ms = chunkserverTimeout_ms;
677 maxretries = retries;
678 if (cacheblockcount < 10) {
679 cacheblockcount = 10;
680 }
681
682 freecacheblocks = cacheblockcount;
683 gCachePerInodePercentage = cachePerInodePercentage;
684
685 idhash = (inodedata**) malloc(sizeof(inodedata*) * IDHASHSIZE);
686 for (i = 0; i < IDHASHSIZE; i++) {
687 idhash[i] = NULL;
688 }
689
690 jqueue = queue_new(0);
691
692 pthread_attr_init(&thattr);
693 pthread_attr_setstacksize(&thattr, 0x100000);
694 pthread_create(&delayed_queue_worker_th, &thattr, delayed_queue_worker, NULL);
695 write_worker_th.resize(workers);
696 for (auto& th : write_worker_th) {
697 pthread_create(&th, &thattr, write_worker, NULL);
698 }
699 pthread_attr_destroy(&thattr);
700
701 gTweaks.registerVariable("WriteMaxRetries", maxretries);
702 }
703
write_data_term(void)704 void write_data_term(void) {
705 uint32_t i;
706 inodedata *id, *idn;
707
708 {
709 Glock lock(gMutex);
710 delayed_queue_put(nullptr, 0, lock);
711 }
712 for (i = 0; i < write_worker_th.size(); i++) {
713 queue_put(jqueue, 0, 0, NULL, 0);
714 }
715 for (i = 0; i < write_worker_th.size(); i++) {
716 pthread_join(write_worker_th[i], NULL);
717 }
718 pthread_join(delayed_queue_worker_th, NULL);
719 queue_delete(jqueue, queue_deleter_delete<inodedata>);
720 for (i = 0; i < IDHASHSIZE; i++) {
721 for (id = idhash[i]; id; id = idn) {
722 idn = id->next;
723 delete id;
724 }
725 }
726 free(idhash);
727 }
728
729 /* glock: UNLOCKED */
write_block(inodedata * id,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t * data)730 int write_block(inodedata *id, uint32_t chindx, uint16_t pos, uint32_t from, uint32_t to, const uint8_t *data) {
731 Glock lock(gMutex);
732 id->lastWriteToDataChain.reset();
733
734 // Try to expand the last block
735 if (!id->dataChain.empty()) {
736 auto& lastBlock = id->dataChain.back();
737 if (lastBlock.chunkIndex == chindx
738 && lastBlock.blockIndex == pos
739 && lastBlock.type == WriteCacheBlock::kWritableBlock
740 && lastBlock.expand(from, to, data)) {
741 id->wakeUpWorkerIfNecessary();
742 return 0;
743 }
744 }
745
746 // Didn't manage to expand an existing block, so allocate a new one
747 write_cb_wait_for_block(id, lock);
748 write_cb_acquire_blocks(1, lock);
749 id->pushToChain(WriteCacheBlock(chindx, pos, WriteCacheBlock::kWritableBlock));
750 sassert(id->dataChain.back().expand(from, to, data));
751 if (id->inqueue) {
752 // Consider some speedup if there are no errors and:
753 // - there is a lot of blocks in the write chain
754 // - there are at least two chunks in the write chain
755 if (id->trycnt == 0 && (id->dataChain.size() > id->minimumBlocksToWrite
756 || id->dataChain.front().chunkIndex != id->dataChain.back().chunkIndex)) {
757 if (delayed_queue_remove(id, lock)) {
758 write_enqueue(id, lock);
759 }
760 }
761 id->wakeUpWorkerIfNecessary();
762 } else {
763 id->inqueue = true;
764 write_enqueue(id, lock);
765 }
766 return 0;
767 }
768
769 /* glock: UNLOCKED */
write_blocks(inodedata * id,uint64_t offset,uint32_t size,const uint8_t * data)770 int write_blocks(inodedata *id, uint64_t offset, uint32_t size, const uint8_t* data) {
771 LOG_AVG_TILL_END_OF_SCOPE0("write_blocks");
772 uint32_t chindx = offset >> MFSCHUNKBITS;
773 uint16_t pos = (offset & MFSCHUNKMASK) >> MFSBLOCKBITS;
774 uint32_t from = offset & MFSBLOCKMASK;
775 while (size > 0) {
776 if (size > MFSBLOCKSIZE - from) {
777 if (write_block(id, chindx, pos, from, MFSBLOCKSIZE, data) < 0) {
778 return LIZARDFS_ERROR_IO;
779 }
780 size -= (MFSBLOCKSIZE - from);
781 data += (MFSBLOCKSIZE - from);
782 from = 0;
783 pos++;
784 if (pos == MFSBLOCKSINCHUNK) {
785 pos = 0;
786 chindx++;
787 }
788 } else {
789 if (write_block(id, chindx, pos, from, from + size, data) < 0) {
790 return LIZARDFS_ERROR_IO;
791 }
792 size = 0;
793 }
794 }
795 return 0;
796 }
797
write_data(void * vid,uint64_t offset,uint32_t size,const uint8_t * data)798 int write_data(void *vid, uint64_t offset, uint32_t size, const uint8_t* data) {
799 LOG_AVG_TILL_END_OF_SCOPE0("write_data");
800 int status;
801 inodedata *id = (inodedata*) vid;
802 if (id == NULL) {
803 return LIZARDFS_ERROR_IO;
804 }
805
806 Glock lock(gMutex);
807 status = id->status;
808 if (status == LIZARDFS_STATUS_OK) {
809 if (offset + size > id->maxfleng) { // move fleng
810 id->maxfleng = offset + size;
811 }
812 id->writewaiting++;
813 while (id->flushwaiting > 0) {
814 id->writecond.wait(lock);
815 }
816 id->writewaiting--;
817 }
818 lock.unlock();
819
820 if (status != LIZARDFS_STATUS_OK) {
821 return status;
822 }
823
824 return write_blocks(id, offset, size, data);
825 }
826
write_data_flushwaiting_increase(inodedata * id,Glock &)827 static void write_data_flushwaiting_increase(inodedata *id, Glock&) {
828 id->flushwaiting++;
829 }
830
write_data_flushwaiting_decrease(inodedata * id,Glock &)831 static void write_data_flushwaiting_decrease(inodedata *id, Glock&) {
832 id->flushwaiting--;
833 if (id->flushwaiting == 0 && id->writewaiting > 0) {
834 id->writecond.notify_all();
835 }
836 }
837
write_data_lcnt_increase(inodedata * id,Glock &)838 static void write_data_lcnt_increase(inodedata *id, Glock&) {
839 id->lcnt++;
840 }
841
write_data_lcnt_decrease(inodedata * id,Glock & lock)842 static void write_data_lcnt_decrease(inodedata *id, Glock& lock) {
843 id->lcnt--;
844 if (id->lcnt == 0 && !id->inqueue && id->flushwaiting == 0 && id->writewaiting == 0) {
845 write_free_inodedata(id, lock);
846 }
847 }
848
write_data_new(uint32_t inode)849 void* write_data_new(uint32_t inode) {
850 inodedata* id;
851 Glock lock(gMutex);
852 id = write_get_inodedata(inode, lock);
853 if (id == NULL) {
854 return NULL;
855 }
856 write_data_lcnt_increase(id, lock);
857 return id;
858 }
859
write_data_flush(void * vid,Glock & lock)860 static int write_data_flush(void* vid, Glock& lock) {
861 inodedata* id = (inodedata*) vid;
862 if (id == NULL) {
863 return LIZARDFS_ERROR_IO;
864 }
865
866 write_data_flushwaiting_increase(id, lock);
867 // If there are no errors (trycnt==0) and inode is waiting in the delayed queue, speed it up
868 if (id->trycnt == 0 && delayed_queue_remove(id, lock)) {
869 write_enqueue(id, lock);
870 }
871 // Wait for the data to be flushed
872 while (id->inqueue) {
873 id->flushcond.wait(lock);
874 }
875 write_data_flushwaiting_decrease(id, lock);
876 return id->status;
877 }
878
write_data_flush(void * vid)879 int write_data_flush(void* vid) {
880 Glock lock(gMutex);
881 return write_data_flush(vid, lock);
882 }
883
write_data_getmaxfleng(uint32_t inode)884 uint64_t write_data_getmaxfleng(uint32_t inode) {
885 uint64_t maxfleng;
886 inodedata* id;
887 Glock lock(gMutex);
888 id = write_find_inodedata(inode, lock);
889 if (id) {
890 maxfleng = id->maxfleng;
891 } else {
892 maxfleng = 0;
893 }
894 return maxfleng;
895 }
896
write_data_flush_inode(uint32_t inode)897 int write_data_flush_inode(uint32_t inode) {
898 Glock lock(gMutex);
899 inodedata* id = write_find_inodedata(inode, lock);
900 if (id == NULL) {
901 return 0;
902 }
903 return write_data_flush(id, lock);
904 }
905
write_data_truncate(uint32_t inode,bool opened,uint32_t uid,uint32_t gid,uint64_t length,Attributes & attr)906 int write_data_truncate(uint32_t inode, bool opened, uint32_t uid, uint32_t gid, uint64_t length,
907 Attributes& attr) {
908 Glock lock(gMutex);
909
910 // 1. Flush writes but don't finish it completely - it'll be done at the end of truncate
911 inodedata* id = write_get_inodedata(inode, lock);
912 if (id == NULL) {
913 return LIZARDFS_ERROR_IO;
914 }
915 write_data_lcnt_increase(id, lock);
916 write_data_flushwaiting_increase(id, lock); // this will block any writing to this inode
917
918 int err = write_data_flush(id, lock);
919 if (err != 0) {
920 write_data_flushwaiting_decrease(id, lock);
921 write_data_lcnt_decrease(id, lock);
922 return err;
923 }
924
925 // 2. Send the request to master
926 uint8_t status;
927 bool writeNeeded;
928 uint64_t oldLength;
929 uint32_t lockId;
930 lock.unlock();
931 int retrySleepTime_us = 200000;
932 uint32_t retries = 0;
933 do {
934 status = fs_truncate(inode, opened, uid, gid, length, writeNeeded, attr, oldLength, lockId);
935 if (status != LIZARDFS_STATUS_OK) {
936 lzfs_pretty_syslog(LOG_INFO, "truncate file %" PRIu32 " to length %" PRIu64 ": %s (try %d/%d)",
937 inode, length, lizardfs_error_string(status), int(retries + 1), int(maxretries));
938 }
939 if (retries >= maxretries) {
940 break;
941 }
942 if (status == LIZARDFS_ERROR_LOCKED) {
943 sleep(1);
944 } else if (status == LIZARDFS_ERROR_CHUNKLOST || status == LIZARDFS_ERROR_NOTDONE) {
945 usleep(retrySleepTime_us);
946 retrySleepTime_us = std::min(2 * retrySleepTime_us, 60 * 1000000);
947 ++retries;
948 }
949 } while (status == LIZARDFS_ERROR_LOCKED || status == LIZARDFS_ERROR_CHUNKLOST || status == LIZARDFS_ERROR_NOTDONE);
950 lock.lock();
951 if (status != 0 || !writeNeeded) {
952 // Something failed or we have nothing to do more (master server managed to do the truncate)
953 write_data_flushwaiting_decrease(id, lock);
954 write_data_lcnt_decrease(id, lock);
955 if (status == LIZARDFS_STATUS_OK) {
956 return 0;
957 } else {
958 // status is now MFS status, so we cannot return any errno
959 throw UnrecoverableWriteException("fs_truncate failed", status);
960 }
961 }
962
963 // We have to write zeros in suitable region to update xor/ec parity parts.
964 // Let's calculate size of the region to be zeroed
965 uint64_t endOffset = std::min({
966 oldLength, // no further than to the end of the file
967 length + slice_traits::ec::kMaxDataCount * MFSBLOCKSIZE, // no more than the maximal stripe
968 (length + MFSCHUNKSIZE - 1) / MFSCHUNKSIZE * MFSCHUNKSIZE // no beyond the end of chunk
969 });
970
971 if (endOffset > length) {
972 // Something has to be written, so pass our lock to writing threads
973 sassert(id->dataChain.empty());
974 id->locator.reset(new TruncateWriteChunkLocator(inode, length / MFSCHUNKSIZE, lockId));
975
976 // And now pass block of zeros to writing threads
977 std::vector<uint8_t> zeros(endOffset - length, 0);
978 lock.unlock();
979 err = write_blocks(id, length, zeros.size(), zeros.data());
980 lock.lock();
981 if (err != 0) {
982 write_data_flushwaiting_decrease(id, lock);
983 write_data_lcnt_decrease(id, lock);
984 return err;
985 }
986
987 // Wait for writing threads to finish
988 err = write_data_flush(id, lock);
989 id->locator.reset();
990 if (err != 0) {
991 // unlock the chunk here?
992 write_data_flushwaiting_decrease(id, lock);
993 write_data_lcnt_decrease(id, lock);
994 return err;
995 }
996 }
997
998 // Now we can tell the master server to finish the truncate operation and then unblock the inode
999 lock.unlock();
1000 status = fs_truncateend(inode, uid, gid, length, lockId, attr);
1001 write_data_flushwaiting_decrease(id, lock);
1002 write_data_lcnt_decrease(id, lock);
1003
1004 if (status != LIZARDFS_STATUS_OK) {
1005 // status is now MFS status, so we cannot return any errno
1006 throw UnrecoverableWriteException("fs_truncateend failed", status);
1007 }
1008 return 0;
1009 }
1010
write_data_end(void * vid)1011 int write_data_end(void* vid) {
1012 Glock lock(gMutex);
1013 inodedata* id = (inodedata*) vid;
1014 if (id == NULL) {
1015 return LIZARDFS_ERROR_IO;
1016 }
1017 int status = write_data_flush(id, lock);
1018 write_data_lcnt_decrease(id, lock);
1019 return status;
1020 }
1021