1 /*
2  Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2017 Skytechnology sp. z o.o..
3 
4  This file was part of MooseFS and is part of LizardFS.
5 
6  LizardFS is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, version 3.
9 
10  LizardFS is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with LizardFS  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 #include "mount/writedata.h"
21 
22 #include <errno.h>
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <sys/time.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 #include <algorithm>
33 #include <condition_variable>
34 #include <mutex>
35 #include <vector>
36 
37 #include "common/chunk_connector.h"
38 
39 #include "common/crc.h"
40 #include "common/datapack.h"
41 #include "common/exceptions.h"
42 #include "common/goal.h"
43 #include "common/massert.h"
44 #include "common/message_receive_buffer.h"
45 #include "common/mfserr.h"
46 #include "common/multi_buffer_writer.h"
47 #include "common/pcqueue.h"
48 #include "common/slice_traits.h"
49 #include "common/slogger.h"
50 #include "common/sockets.h"
51 #include "common/time_utils.h"
52 #include "devtools/request_log.h"
53 #include "mount/chunk_writer.h"
54 #include "mount/global_chunkserver_stats.h"
55 #include "mount/mastercomm.h"
56 #include "mount/readdata.h"
57 #include "mount/tweaks.h"
58 #include "mount/write_cache_block.h"
59 #include "protocol/cltocs.h"
60 #include "protocol/MFSCommunication.h"
61 
62 #define IDLE_CONNECTION_TIMEOUT 6
63 #define IDHASHSIZE 256
64 #define IDHASH(inode) (((inode)*0xB239FB71)%IDHASHSIZE)
65 
66 namespace {
67 
68 struct inodedata {
69 	uint32_t inode;
70 	uint64_t maxfleng;
71 	int status;
72 	uint16_t flushwaiting;
73 	uint16_t writewaiting;
74 	uint16_t lcnt;
75 	uint32_t trycnt;
76 	bool inqueue; // true it this inode is waiting in one of the queues or is being processed
77 	uint32_t minimumBlocksToWrite;
78 	std::list<WriteCacheBlock> dataChain;
79 	int alterations_in_chain; // number of adherent blocks with different chunk ids in chain
80 	std::condition_variable flushcond; // wait for !inqueue (flush)
81 	std::condition_variable writecond; // wait for flushwaiting==0 (write)
82 	inodedata *next;
83 	std::unique_ptr<WriteChunkLocator> locator;
84 	int newDataInChainPipe[2];
85 	bool workerWaitingForData;
86 	Timer lastWriteToDataChain;
87 	Timer lastWriteToChunkservers;
88 
inodedata__anone68df2a60111::inodedata89 	inodedata(uint32_t inode)
90 			: inode(inode),
91 			  maxfleng(0),
92 			  status(LIZARDFS_STATUS_OK),
93 			  flushwaiting(0),
94 			  writewaiting(0),
95 			  lcnt(0),
96 			  trycnt(0),
97 			  inqueue(false),
98 			  minimumBlocksToWrite(1),
99 			  alterations_in_chain(),
100 			  next(nullptr),
101 			  workerWaitingForData(false) {
102 #ifdef _WIN32
103 		// We don't use inodeData->waitingworker and inodeData->pipe on Cygwin because
104 		// Cygwin's implementation of mixed socket & pipe polling is very inefficient.
105 		// On mingw platform pipes are unavailable.
106 		newDataInChainPipe[0] = newDataInChainPipe[1] = -1;
107 #else
108 		if (pipe(newDataInChainPipe) < 0) {
109 			lzfs_pretty_syslog(LOG_WARNING, "creating pipe error: %s", strerr(errno));
110 			newDataInChainPipe[0] = -1;
111 		}
112 #endif
113 	}
114 
~inodedata__anone68df2a60111::inodedata115 	~inodedata() {
116 		if (isDataChainPipeValid()) {
117 			close(newDataInChainPipe[0]);
118 			close(newDataInChainPipe[1]);
119 		}
120 	}
121 
122 	/* glock: LOCKED */
wakeUpWorkerIfNecessary__anone68df2a60111::inodedata123 	void wakeUpWorkerIfNecessary() {
124 		/*
125 		 * Write worker always looks for the first block in chain and we modify or add always the
126 		 * last block in chain so it is necessary to wake up write worker only if the first block
127 		 * is the last one, ie. dataChain.size() == 1.
128 		 */
129 		if (workerWaitingForData && dataChain.size() == 1 && isDataChainPipeValid()) {
130 			if (write(newDataInChainPipe[1], " ", 1) != 1) {
131 				lzfs_pretty_syslog(LOG_ERR, "write pipe error: %s", strerr(errno));
132 			}
133 			workerWaitingForData = false;
134 		}
135 	}
136 
137 	/* glock: UNUSED */
isDataChainPipeValid__anone68df2a60111::inodedata138 	bool isDataChainPipeValid() const {
139 		return newDataInChainPipe[0] >= 0;
140 	}
141 
142 	/*! Check if inode requires flushing all its data chain to chunkservers.
143 	 *
144 	 * Returns true if anyone requested to flush the data by calling write_data_flush
145 	 * or write_data_flush_inode or the data in data chain is too old to keep it longer in
146 	 * our buffers. If this function returns false, we write only full stripes from data
147 	 * chain to chunkservers.
148 	 * glock: LOCKED
149 	 */
requiresFlushing__anone68df2a60111::inodedata150 	bool requiresFlushing() const {
151 		return (flushwaiting > 0
152 				|| lastWriteToDataChain.elapsed_ms() >= kMaximumTimeInDataChainSinceLastWrite_ms
153 				|| lastWriteToChunkservers.elapsed_ms() >= kMaximumTimeInDataChainSinceLastFlush_ms);
154 	}
155 
pushToChain__anone68df2a60111::inodedata156 	void pushToChain(WriteCacheBlock &&block) {
157 		dataChain.push_back(std::move(block));
158 		if (dataChain.size() > 1 && dataChain.back().chunkIndex != std::next(dataChain.rbegin())->chunkIndex) {
159 			alterations_in_chain++;
160 		}
161 	}
162 
popFromChain__anone68df2a60111::inodedata163 	void popFromChain() {
164 		assert(dataChain.size() > 0);
165 		if (dataChain.size() > 1 && dataChain.front().chunkIndex != std::next(dataChain.begin())->chunkIndex) {
166 			alterations_in_chain--;
167 		}
168 		dataChain.pop_front();
169 	}
170 
registerAlterationsInChain__anone68df2a60111::inodedata171 	void registerAlterationsInChain(int delta) {
172 		alterations_in_chain += delta;
173 	}
174 
hasMultipleChunkIdsInChain__anone68df2a60111::inodedata175 	bool hasMultipleChunkIdsInChain() const {
176 		return alterations_in_chain > 0;
177 	}
178 
179 private:
180 	/*! Limit for \p lastWriteToChunkservers after which we force a flush.
181 	 *
182 	 * Maximum time for data to be kept in data chain waiting for collecting a full stripe.
183 	 */
184 	static const uint32_t kMaximumTimeInDataChainSinceLastFlush_ms = 15000;
185 
186 	/*! Limit for \p lastWriteToDataChain after which we force a flush.
187 	 *
188 	 * Maximum time for data to be kept in data chain waiting for collecting a full stripe
189 	 * if no new data is written into the data chain
190 	 */
191 	static const uint32_t kMaximumTimeInDataChainSinceLastWrite_ms = 5000;
192 };
193 
194 struct DelayedQueueEntry {
195 	inodedata *inodeData;
196 	int32_t ticksLeft;
197 	static constexpr int kTicksPerSecond = 10;
198 
DelayedQueueEntry__anone68df2a60111::DelayedQueueEntry199 	DelayedQueueEntry(inodedata *inodeData, int32_t ticksLeft)
200 			: inodeData(inodeData),
201 			  ticksLeft(ticksLeft) {
202 	}
203 };
204 
205 } // anonymous namespace
206 
207 static std::atomic<uint32_t> maxretries;
208 static std::mutex gMutex;
209 typedef std::unique_lock<std::mutex> Glock;
210 
211 static std::condition_variable fcbcond;
212 static uint32_t fcbwaiting = 0;
213 static int64_t freecacheblocks;
214 static inodedata **idhash;
215 
216 static uint32_t gWriteWindowSize;
217 static uint32_t gChunkserverTimeout_ms;
218 
219 // percentage of the free cache (1% - 100%) which can be used by one inode
220 static uint32_t gCachePerInodePercentage;
221 
222 static pthread_t delayed_queue_worker_th;
223 static std::vector<pthread_t> write_worker_th;
224 
225 static void* jqueue;
226 static std::list<DelayedQueueEntry> delayedQueue;
227 
228 static ConnectionPool gChunkserverConnectionPool;
229 static ChunkConnectorUsingPool gChunkConnector(gChunkserverConnectionPool);
230 
write_cb_release_blocks(uint32_t count,Glock &)231 void write_cb_release_blocks(uint32_t count, Glock&) {
232 	freecacheblocks += count;
233 	if (fcbwaiting > 0 && freecacheblocks > 0) {
234 		fcbcond.notify_all();
235 	}
236 }
237 
write_cb_acquire_blocks(uint32_t count,Glock &)238 void write_cb_acquire_blocks(uint32_t count, Glock&) {
239 	freecacheblocks -= count;
240 }
241 
write_cb_wait_for_block(inodedata * id,Glock & glock)242 void write_cb_wait_for_block(inodedata* id, Glock& glock) {
243 	LOG_AVG_TILL_END_OF_SCOPE0("write_cb_wait_for_block");
244 	fcbwaiting++;
245 	uint64_t dataChainSize = id->dataChain.size();
246 	while (freecacheblocks <= 0
247 			// dataChainSize / (dataChainSize + freecacheblocks) > gCachePerInodePercentage / 100
248 			// really means "0 > 0"
249 			|| dataChainSize * 100 > (dataChainSize + freecacheblocks) * gCachePerInodePercentage)
250 	{
251 		fcbcond.wait(glock);
252 	}
253 	fcbwaiting--;
254 }
255 
256 /* inode */
257 
write_find_inodedata(uint32_t inode,Glock &)258 inodedata* write_find_inodedata(uint32_t inode, Glock&) {
259 	uint32_t idh = IDHASH(inode);
260 	for (inodedata* id = idhash[idh]; id; id = id->next) {
261 		if (id->inode == inode) {
262 			return id;
263 		}
264 	}
265 	return NULL;
266 }
267 
write_get_inodedata(uint32_t inode,Glock &)268 inodedata* write_get_inodedata(uint32_t inode, Glock&) {
269 	uint32_t idh = IDHASH(inode);
270 	inodedata* id;
271 	for (inodedata* id = idhash[idh]; id; id = id->next) {
272 		if (id->inode == inode) {
273 			return id;
274 		}
275 	}
276 	id = new inodedata(inode);
277 	id->next = idhash[idh];
278 	idhash[idh] = id;
279 	return id;
280 }
281 
write_free_inodedata(inodedata * fid,Glock &)282 void write_free_inodedata(inodedata* fid, Glock&) {
283 	uint32_t idh = IDHASH(fid->inode);
284 	inodedata *id, **idp;
285 	idp = &(idhash[idh]);
286 	while ((id = *idp)) {
287 		if (id == fid) {
288 			*idp = id->next;
289 			delete id;
290 			return;
291 		}
292 		idp = &(id->next);
293 	}
294 }
295 
296 /* delayed queue */
297 
delayed_queue_put(inodedata * id,uint32_t seconds,Glock &)298 static void delayed_queue_put(inodedata* id, uint32_t seconds, Glock&) {
299 	delayedQueue.push_back(DelayedQueueEntry(id, seconds * DelayedQueueEntry::kTicksPerSecond));
300 }
301 
delayed_queue_remove(inodedata * id,Glock &)302 static bool delayed_queue_remove(inodedata* id, Glock&) {
303 	for (auto it = delayedQueue.begin(); it != delayedQueue.end(); ++it) {
304 		if (it->inodeData == id) {
305 			delayedQueue.erase(it);
306 			return true;
307 		}
308 	}
309 	return false;
310 }
311 
delayed_queue_worker(void *)312 void* delayed_queue_worker(void*) {
313 	for (;;) {
314 		Timeout timeout(std::chrono::microseconds(1000000 / DelayedQueueEntry::kTicksPerSecond));
315 		Glock lock(gMutex);
316 		auto it = delayedQueue.begin();
317 		while (it != delayedQueue.end()) {
318 			if (it->inodeData == NULL) {
319 				return NULL;
320 			}
321 			if (--it->ticksLeft <= 0) {
322 				queue_put(jqueue, 0, 0, reinterpret_cast<uint8_t*>(it->inodeData), 0);
323 				it = delayedQueue.erase(it);
324 			} else {
325 				++it;
326 			}
327 		}
328 		lock.unlock();
329 		usleep(timeout.remaining_us());
330 	}
331 	return NULL;
332 }
333 
334 /* queues */
335 
write_delayed_enqueue(inodedata * id,uint32_t seconds,Glock & lock)336 void write_delayed_enqueue(inodedata* id, uint32_t seconds, Glock& lock) {
337 	if (seconds > 0) {
338 		delayed_queue_put(id, seconds, lock);
339 	} else {
340 		queue_put(jqueue, 0, 0, (uint8_t*) id, 0);
341 	}
342 }
343 
write_enqueue(inodedata * id,Glock &)344 void write_enqueue(inodedata* id, Glock&) {
345 	queue_put(jqueue, 0, 0, (uint8_t*) id, 0);
346 }
347 
write_job_delayed_end(inodedata * id,int status,int seconds,Glock & lock)348 void write_job_delayed_end(inodedata* id, int status, int seconds, Glock &lock) {
349 	LOG_AVG_TILL_END_OF_SCOPE0("write_job_delayed_end");
350 	LOG_AVG_TILL_END_OF_SCOPE1("write_job_delayed_end#sec", seconds);
351 	id->locator.reset();
352 	if (status != LIZARDFS_STATUS_OK) {
353 		lzfs_pretty_syslog(LOG_WARNING, "error writing file number %" PRIu32 ": %s", id->inode, lizardfs_error_string(status));
354 		id->status = status;
355 	}
356 	status = id->status;
357 	if (id->requiresFlushing() > 0) {
358 		// Don't sleep if we have to write all the data immediately
359 		seconds = 0;
360 	}
361 	if (!id->dataChain.empty() && status == LIZARDFS_STATUS_OK) { // still have some work to do
362 		id->trycnt = 0; // on good write reset try counter
363 		write_delayed_enqueue(id, seconds, lock);
364 	} else {        // no more work or error occurred
365 		// if this is an error then release all data blocks
366 		write_cb_release_blocks(id->dataChain.size(), lock);
367 		id->dataChain.clear();
368 		id->inqueue = false;
369 		id->maxfleng = 0; // proper file length is now on the master server, remove our length cache
370 		if (id->flushwaiting > 0) {
371 			id->flushcond.notify_all();
372 		}
373 	}
374 }
375 
write_job_end(inodedata * id,int status,Glock & lock)376 void write_job_end(inodedata *id, int status, Glock &lock) {
377 	write_job_delayed_end(id, status, 0, lock);
378 }
379 
380 class InodeChunkWriter {
381 public:
InodeChunkWriter()382 	InodeChunkWriter() : inodeData_(nullptr), chunkIndex_(0) {}
383 	void processJob(inodedata* data);
384 
385 private:
386 	void processDataChain(ChunkWriter& writer);
387 	void returnJournalToDataChain(std::list<WriteCacheBlock>&& journal, Glock&);
388 	bool haveAnyBlockInCurrentChunk(Glock&);
389 	bool haveBlockWorthWriting(uint32_t unfinishedOperationCount, Glock&);
390 	inodedata* inodeData_;
391 	uint32_t chunkIndex_;
392 	Timer wholeOperationTimer;
393 
394 	// Maximum time of writing one chunk
395 	static const uint32_t kMaximumTime = 30;
396 	static const uint32_t kMaximumTimeWhenJobsWaiting = 10;
397 	// For the last 'kTimeToFinishOperations' seconds of maximumTime we won't start new operations
398 	static const uint32_t kTimeToFinishOperations = 5;
399 };
400 
processJob(inodedata * inodeData)401 void InodeChunkWriter::processJob(inodedata* inodeData) {
402 	LOG_AVG_TILL_END_OF_SCOPE0("InodeChunkWriter::processJob");
403 	inodeData_ = inodeData;
404 
405 	// First, choose index of some chunk to write
406 	Glock lock(gMutex);
407 	int status = inodeData_->status;
408 	bool haveDataToWrite;
409 	if (inodeData_->locator) {
410 		// There is a chunk lock left by a previous unfinished job -- let's finish it!
411 		chunkIndex_ = inodeData_->locator->chunkIndex();
412 		haveDataToWrite = haveAnyBlockInCurrentChunk(lock);
413 	} else if (!inodeData_->dataChain.empty()) {
414 		// There is no unfinished job, but there is some data to write -- let's start a new job
415 		chunkIndex_ = inodeData_->dataChain.front().chunkIndex;
416 		haveDataToWrite = true;
417 	} else {
418 		// No data, no unfinished jobs -- something wrong!
419 		// This should never happen, so the status doesn't really matter
420 		lzfs_pretty_syslog(LOG_WARNING, "got inode with no data to write!!!");
421 		haveDataToWrite = false;
422 		status = LIZARDFS_ERROR_EINVAL;
423 	}
424 	if (status != LIZARDFS_STATUS_OK) {
425 		write_job_end(inodeData_, status, lock);
426 		return;
427 	}
428 	lock.unlock();
429 
430 	/*  Process the job */
431 	ChunkWriter writer(globalChunkserverStats, gChunkConnector, inodeData_->newDataInChainPipe[0]);
432 	wholeOperationTimer.reset();
433 	std::unique_ptr<WriteChunkLocator> locator = std::move(inodeData_->locator);
434 	if (!locator) {
435 		locator.reset(new WriteChunkLocator());
436 	}
437 
438 	try {
439 		try {
440 			locator->locateAndLockChunk(inodeData_->inode, chunkIndex_);
441 
442 			// Optimization -- talk with chunkservers only if we have to write any data.
443 			// Don't do this if we just have to release some previously unlocked lock.
444 			if (haveDataToWrite) {
445 				writer.init(locator.get(), gChunkserverTimeout_ms);
446 				processDataChain(writer);
447 				writer.finish(kTimeToFinishOperations * 1000);
448 
449 				Glock lock(gMutex);
450 				returnJournalToDataChain(writer.releaseJournal(), lock);
451 			}
452 			locator->unlockChunk();
453 			read_inode_ops(inodeData_->inode);
454 
455 			Glock lock(gMutex);
456 			inodeData_->minimumBlocksToWrite = writer.getMinimumBlockCountWorthWriting();
457 			bool canWait = !inodeData_->requiresFlushing();
458 			if (!haveAnyBlockInCurrentChunk(lock)) {
459 				// There is no need to wait if we have just finished writing some chunk.
460 				// Let's immediately start writing the next chunk (if there is any).
461 				canWait = false;
462 			}
463 			if (inodeData_->hasMultipleChunkIdsInChain()) {
464 				// Don't wait if there is more than one chunk in the data chain -- the first chunk
465 				// has to be flushed, because no more data will be added to it
466 				canWait = false;
467 			}
468 			write_job_delayed_end(inodeData_, LIZARDFS_STATUS_OK, (canWait ? 1 : 0), lock);
469 		} catch (Exception& e) {
470 			std::string errorString = e.what();
471 			Glock lock(gMutex);
472 			if (e.status() != LIZARDFS_ERROR_LOCKED) {
473 				inodeData_->trycnt++;
474 				errorString += " (try counter: " + std::to_string(inodeData->trycnt) + ")";
475 			} else if (inodeData_->trycnt == 0) {
476 				// Set to nonzero to inform writers, that this task needs to wait a bit
477 				// Don't increase -- LIZARDFS_ERROR_LOCKED means that chunk is locked by a different client
478 				// and we have to wait until it is unlocked
479 				inodeData_->trycnt = 1;
480 			}
481 			// Keep the lock
482 			inodeData_->locator = std::move(locator);
483 			// Move data left in the journal into front of the write cache
484 			returnJournalToDataChain(writer.releaseJournal(), lock);
485 			lock.unlock();
486 
487 			lzfs_pretty_syslog(LOG_WARNING, "write file error, inode: %" PRIu32 ", index: %" PRIu32 " - %s",
488 					inodeData_->inode, chunkIndex_, errorString.c_str());
489 			if (inodeData_->trycnt >= maxretries) {
490 				// Convert error to an unrecoverable error
491 				throw UnrecoverableWriteException(e.message(), e.status());
492 			} else {
493 				// This may be recoverable or unrecoverable error
494 				throw;
495 			}
496 		}
497 	} catch (UnrecoverableWriteException& e) {
498 		Glock lock(gMutex);
499 		if (e.status() == LIZARDFS_ERROR_ENOENT) {
500 			write_job_end(inodeData_, LIZARDFS_ERROR_EBADF, lock);
501 		} else if (e.status() == LIZARDFS_ERROR_QUOTA) {
502 			write_job_end(inodeData_, LIZARDFS_ERROR_QUOTA, lock);
503 		} else if (e.status() == LIZARDFS_ERROR_NOSPACE || e.status() == LIZARDFS_ERROR_NOCHUNKSERVERS) {
504 			write_job_end(inodeData_, LIZARDFS_ERROR_NOSPACE, lock);
505 		} else {
506 			write_job_end(inodeData_, LIZARDFS_ERROR_IO, lock);
507 		}
508 	} catch (Exception& e) {
509 		Glock lock(gMutex);
510 		int waitTime = 1;
511 		if (inodeData_->trycnt > 10) {
512 			waitTime = std::min<int>(10, inodeData_->trycnt - 9);
513 		}
514 		write_delayed_enqueue(inodeData_, waitTime, lock);
515 	}
516 }
517 
processDataChain(ChunkWriter & writer)518 void InodeChunkWriter::processDataChain(ChunkWriter& writer) {
519 	LOG_AVG_TILL_END_OF_SCOPE0("InodeChunkWriter::processDataChain");
520 	uint32_t maximumTime = kMaximumTime;
521 	bool otherJobsAreWaiting = false;
522 	while (true) {
523 		bool newOtherJobsAreWaiting = !queue_isempty(jqueue);
524 		if (!otherJobsAreWaiting && newOtherJobsAreWaiting) {
525 			// Some new jobs have just arrived in the queue -- we should finish faster.
526 			maximumTime = kMaximumTimeWhenJobsWaiting;
527 			// But we need at least 5 seconds to finish the operations that are in progress
528 			uint32_t elapsedSeconds = wholeOperationTimer.elapsed_s();
529 			if (elapsedSeconds + kTimeToFinishOperations >= maximumTime) {
530 				maximumTime = elapsedSeconds + kTimeToFinishOperations;
531 			}
532 		}
533 		otherJobsAreWaiting = newOtherJobsAreWaiting;
534 
535 		// If we have sent the previous message and have some time left, we can take
536 		// another block from current chunk to process it simultaneously. We won't take anything
537 		// new if we've already sent 'gWriteWindowSize' blocks and didn't receive status from
538 		// the chunkserver.
539 		bool can_expect_next_block = true;
540 		if (wholeOperationTimer.elapsed_s() + kTimeToFinishOperations < maximumTime
541 				&& writer.acceptsNewOperations()) {
542 			Glock lock(gMutex);
543 			// While there is any block worth sending, we add new write operation
544 			while (haveBlockWorthWriting(writer.getUnfinishedOperationsCount(), lock)) {
545 				// Remove block from cache and pass it to the writer
546 				writer.addOperation(std::move(inodeData_->dataChain.front()));
547 				inodeData_->popFromChain();
548 				write_cb_release_blocks(1, lock);
549 			}
550 			if (inodeData_->requiresFlushing() && !haveAnyBlockInCurrentChunk(lock)) {
551 				// No more data and some flushing is needed or required, so flush everything
552 				writer.startFlushMode();
553 			}
554 			if (writer.getUnfinishedOperationsCount() < gWriteWindowSize) {
555 				inodeData_->workerWaitingForData = true;
556 			}
557 			can_expect_next_block = haveAnyBlockInCurrentChunk(lock);
558 		} else if (writer.acceptsNewOperations()) {
559 			// We are running out of time...
560 			Glock lock(gMutex);
561 			if (!inodeData_->requiresFlushing()) {
562 				// Nobody is waiting for the data to be flushed and the data in write chain
563 				// isn't too old. Let's postpone any operations
564 				// that didn't start yet and finish them in the next time slice for this chunk
565 				writer.dropNewOperations();
566 			} else {
567 				// Somebody if waiting for a flush, so we have to finish writing everything.
568 				writer.startFlushMode();
569 			}
570 			can_expect_next_block = haveAnyBlockInCurrentChunk(lock);
571 		}
572 
573 		if (writer.startNewOperations(can_expect_next_block) > 0) {
574 			Glock lock(gMutex);
575 			inodeData_->lastWriteToChunkservers.reset();
576 		}
577 		if (writer.getPendingOperationsCount() == 0) {
578 			return;
579 		} else if (wholeOperationTimer.elapsed_s() >= maximumTime) {
580 			throw RecoverableWriteException(
581 					"Timeout after " + std::to_string(wholeOperationTimer.elapsed_ms()) + " ms",
582 					LIZARDFS_ERROR_TIMEOUT);
583 		}
584 
585 		// Let's sleep a bit shorter if we can't be woken up by the pipe to reduce the latency
586 		writer.processOperations(inodeData_->isDataChainPipeValid() ? 50 : 10);
587 	}
588 }
589 
returnJournalToDataChain(std::list<WriteCacheBlock> && journal,Glock & lock)590 void InodeChunkWriter::returnJournalToDataChain(std::list<WriteCacheBlock> &&journal, Glock &lock) {
591 	if (!journal.empty()) {
592 		write_cb_acquire_blocks(journal.size(), lock);
593 		uint64_t prev_id = journal.front().chunkIndex;
594 		int alterations = (!inodeData_->dataChain.empty()
595 				&& journal.back().chunkIndex != inodeData_->dataChain.front().chunkIndex) ? 1 : 0;
596 		for (auto it = std::next(journal.begin()); it != journal.end(); ++it) {
597 			if (it->chunkIndex != prev_id) {
598 				alterations++;
599 				prev_id = it->chunkIndex;
600 			}
601 		}
602 		inodeData_->dataChain.splice(inodeData_->dataChain.begin(), std::move(journal));
603 		inodeData_->registerAlterationsInChain(alterations);
604 	}
605 }
606 
607 /*
608  * Check if there is any data in the same chunk waiting to be written.
609  */
haveAnyBlockInCurrentChunk(Glock &)610 bool InodeChunkWriter::haveAnyBlockInCurrentChunk(Glock&) {
611 	if (inodeData_->dataChain.empty()) {
612 		return false;
613 	} else {
614 		return inodeData_->dataChain.front().chunkIndex == chunkIndex_;
615 	}
616 }
617 
618 /*
619  * Check if there is any data worth sending to the chunkserver.
620  * We will avoid sending blocks of size different than MFSBLOCKSIZE.
621  * These can be taken only if we are close to run out of tasks to do.
622  * glock: LOCKED
623  */
haveBlockWorthWriting(uint32_t unfinishedOperationCount,Glock & lock)624 bool InodeChunkWriter::haveBlockWorthWriting(uint32_t unfinishedOperationCount, Glock& lock) {
625 	if (!haveAnyBlockInCurrentChunk(lock)) {
626 		return false;
627 	}
628 	const auto& block = inodeData_->dataChain.front();
629 	if (block.type != WriteCacheBlock::kWritableBlock) {
630 		// Always write data, that was previously written
631 		return true;
632 	} else if (unfinishedOperationCount >= gWriteWindowSize) {
633 		// Don't start new operations if there is already a lot of pending writes
634 		return false;
635 	} else {
636 		// Always start full blocks; start partial blocks only if we have to flush the data
637 		// or the block won't be expanded (only the last one can be) to a full block
638 		return (block.size() == MFSBLOCKSIZE
639 				|| inodeData_->requiresFlushing()
640 				|| inodeData_->dataChain.size() > 1);
641 	}
642 }
643 
644 /* main working thread | glock:UNLOCKED */
write_worker(void *)645 void* write_worker(void*) {
646 	InodeChunkWriter inodeDataWriter;
647 	for (;;) {
648 		// get next job
649 		uint32_t z1, z2, z3;
650 		uint8_t *data;
651 		{
652 			LOG_AVG_TILL_END_OF_SCOPE0("write_worker#idle");
653 			queue_get(jqueue, &z1, &z2, &data, &z3);
654 		}
655 		if (data == NULL) {
656 			return NULL;
657 		}
658 
659 		// process the job
660 		LOG_AVG_TILL_END_OF_SCOPE0("write_worker#working");
661 		inodeDataWriter.processJob((inodedata*) data);
662 	}
663 	return NULL;
664 }
665 
666 /* API | glock: INITIALIZED,UNLOCKED */
write_data_init(uint32_t cachesize,uint32_t retries,uint32_t workers,uint32_t writewindowsize,uint32_t chunkserverTimeout_ms,uint32_t cachePerInodePercentage)667 void write_data_init(uint32_t cachesize, uint32_t retries, uint32_t workers,
668 		uint32_t writewindowsize, uint32_t chunkserverTimeout_ms, uint32_t cachePerInodePercentage) {
669 	uint64_t cachebytecount = uint64_t(cachesize) * 1024 * 1024;
670 	uint64_t cacheblockcount = (cachebytecount / MFSBLOCKSIZE);
671 	uint32_t i;
672 	pthread_attr_t thattr;
673 
674 	gChunkConnector.setSourceIp(fs_getsrcip());
675 	gWriteWindowSize = writewindowsize;
676 	gChunkserverTimeout_ms = chunkserverTimeout_ms;
677 	maxretries = retries;
678 	if (cacheblockcount < 10) {
679 		cacheblockcount = 10;
680 	}
681 
682 	freecacheblocks = cacheblockcount;
683 	gCachePerInodePercentage = cachePerInodePercentage;
684 
685 	idhash = (inodedata**) malloc(sizeof(inodedata*) * IDHASHSIZE);
686 	for (i = 0; i < IDHASHSIZE; i++) {
687 		idhash[i] = NULL;
688 	}
689 
690 	jqueue = queue_new(0);
691 
692 	pthread_attr_init(&thattr);
693 	pthread_attr_setstacksize(&thattr, 0x100000);
694 	pthread_create(&delayed_queue_worker_th, &thattr, delayed_queue_worker, NULL);
695 	write_worker_th.resize(workers);
696 	for (auto& th : write_worker_th) {
697 		pthread_create(&th, &thattr, write_worker, NULL);
698 	}
699 	pthread_attr_destroy(&thattr);
700 
701 	gTweaks.registerVariable("WriteMaxRetries", maxretries);
702 }
703 
write_data_term(void)704 void write_data_term(void) {
705 	uint32_t i;
706 	inodedata *id, *idn;
707 
708 	{
709 		Glock lock(gMutex);
710 		delayed_queue_put(nullptr, 0, lock);
711 	}
712 	for (i = 0; i < write_worker_th.size(); i++) {
713 		queue_put(jqueue, 0, 0, NULL, 0);
714 	}
715 	for (i = 0; i < write_worker_th.size(); i++) {
716 		pthread_join(write_worker_th[i], NULL);
717 	}
718 	pthread_join(delayed_queue_worker_th, NULL);
719 	queue_delete(jqueue, queue_deleter_delete<inodedata>);
720 	for (i = 0; i < IDHASHSIZE; i++) {
721 		for (id = idhash[i]; id; id = idn) {
722 			idn = id->next;
723 			delete id;
724 		}
725 	}
726 	free(idhash);
727 }
728 
729 /* glock: UNLOCKED */
write_block(inodedata * id,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t * data)730 int write_block(inodedata *id, uint32_t chindx, uint16_t pos, uint32_t from, uint32_t to, const uint8_t *data) {
731 	Glock lock(gMutex);
732 	id->lastWriteToDataChain.reset();
733 
734 	// Try to expand the last block
735 	if (!id->dataChain.empty()) {
736 		auto& lastBlock = id->dataChain.back();
737 		if (lastBlock.chunkIndex == chindx
738 				&& lastBlock.blockIndex == pos
739 				&& lastBlock.type == WriteCacheBlock::kWritableBlock
740 				&& lastBlock.expand(from, to, data)) {
741 			id->wakeUpWorkerIfNecessary();
742 			return 0;
743 		}
744 	}
745 
746 	// Didn't manage to expand an existing block, so allocate a new one
747 	write_cb_wait_for_block(id, lock);
748 	write_cb_acquire_blocks(1, lock);
749 	id->pushToChain(WriteCacheBlock(chindx, pos, WriteCacheBlock::kWritableBlock));
750 	sassert(id->dataChain.back().expand(from, to, data));
751 	if (id->inqueue) {
752 		// Consider some speedup if there are no errors and:
753 		// - there is a lot of blocks in the write chain
754 		// - there are at least two chunks in the write chain
755 		if (id->trycnt == 0 && (id->dataChain.size() > id->minimumBlocksToWrite
756 			|| id->dataChain.front().chunkIndex != id->dataChain.back().chunkIndex)) {
757 			if (delayed_queue_remove(id, lock)) {
758 				write_enqueue(id, lock);
759 			}
760 		}
761 		id->wakeUpWorkerIfNecessary();
762 	} else {
763 		id->inqueue = true;
764 		write_enqueue(id, lock);
765 	}
766 	return 0;
767 }
768 
769 /* glock: UNLOCKED */
write_blocks(inodedata * id,uint64_t offset,uint32_t size,const uint8_t * data)770 int write_blocks(inodedata *id, uint64_t offset, uint32_t size, const uint8_t* data) {
771 	LOG_AVG_TILL_END_OF_SCOPE0("write_blocks");
772 	uint32_t chindx = offset >> MFSCHUNKBITS;
773 	uint16_t pos = (offset & MFSCHUNKMASK) >> MFSBLOCKBITS;
774 	uint32_t from = offset & MFSBLOCKMASK;
775 	while (size > 0) {
776 		if (size > MFSBLOCKSIZE - from) {
777 			if (write_block(id, chindx, pos, from, MFSBLOCKSIZE, data) < 0) {
778 				return LIZARDFS_ERROR_IO;
779 			}
780 			size -= (MFSBLOCKSIZE - from);
781 			data += (MFSBLOCKSIZE - from);
782 			from = 0;
783 			pos++;
784 			if (pos == MFSBLOCKSINCHUNK) {
785 				pos = 0;
786 				chindx++;
787 			}
788 		} else {
789 			if (write_block(id, chindx, pos, from, from + size, data) < 0) {
790 				return LIZARDFS_ERROR_IO;
791 			}
792 			size = 0;
793 		}
794 	}
795 	return 0;
796 }
797 
write_data(void * vid,uint64_t offset,uint32_t size,const uint8_t * data)798 int write_data(void *vid, uint64_t offset, uint32_t size, const uint8_t* data) {
799 	LOG_AVG_TILL_END_OF_SCOPE0("write_data");
800 	int status;
801 	inodedata *id = (inodedata*) vid;
802 	if (id == NULL) {
803 		return LIZARDFS_ERROR_IO;
804 	}
805 
806 	Glock lock(gMutex);
807 	status = id->status;
808 	if (status == LIZARDFS_STATUS_OK) {
809 		if (offset + size > id->maxfleng) {     // move fleng
810 			id->maxfleng = offset + size;
811 		}
812 		id->writewaiting++;
813 		while (id->flushwaiting > 0) {
814 			id->writecond.wait(lock);
815 		}
816 		id->writewaiting--;
817 	}
818 	lock.unlock();
819 
820 	if (status != LIZARDFS_STATUS_OK) {
821 		return status;
822 	}
823 
824 	return write_blocks(id, offset, size, data);
825 }
826 
write_data_flushwaiting_increase(inodedata * id,Glock &)827 static void write_data_flushwaiting_increase(inodedata *id, Glock&) {
828 	id->flushwaiting++;
829 }
830 
write_data_flushwaiting_decrease(inodedata * id,Glock &)831 static void write_data_flushwaiting_decrease(inodedata *id, Glock&) {
832 	id->flushwaiting--;
833 	if (id->flushwaiting == 0 && id->writewaiting > 0) {
834 		id->writecond.notify_all();
835 	}
836 }
837 
write_data_lcnt_increase(inodedata * id,Glock &)838 static void write_data_lcnt_increase(inodedata *id, Glock&) {
839 	id->lcnt++;
840 }
841 
write_data_lcnt_decrease(inodedata * id,Glock & lock)842 static void write_data_lcnt_decrease(inodedata *id, Glock& lock) {
843 	id->lcnt--;
844 	if (id->lcnt == 0 && !id->inqueue && id->flushwaiting == 0 && id->writewaiting == 0) {
845 		write_free_inodedata(id, lock);
846 	}
847 }
848 
write_data_new(uint32_t inode)849 void* write_data_new(uint32_t inode) {
850 	inodedata* id;
851 	Glock lock(gMutex);
852 	id = write_get_inodedata(inode, lock);
853 	if (id == NULL) {
854 		return NULL;
855 	}
856 	write_data_lcnt_increase(id, lock);
857 	return id;
858 }
859 
write_data_flush(void * vid,Glock & lock)860 static int write_data_flush(void* vid, Glock& lock) {
861 	inodedata* id = (inodedata*) vid;
862 	if (id == NULL) {
863 		return LIZARDFS_ERROR_IO;
864 	}
865 
866 	write_data_flushwaiting_increase(id, lock);
867 	// If there are no errors (trycnt==0) and inode is waiting in the delayed queue, speed it up
868 	if (id->trycnt == 0 && delayed_queue_remove(id, lock)) {
869 		write_enqueue(id, lock);
870 	}
871 	// Wait for the data to be flushed
872 	while (id->inqueue) {
873 		id->flushcond.wait(lock);
874 	}
875 	write_data_flushwaiting_decrease(id, lock);
876 	return id->status;
877 }
878 
write_data_flush(void * vid)879 int write_data_flush(void* vid) {
880 	Glock lock(gMutex);
881 	return write_data_flush(vid, lock);
882 }
883 
write_data_getmaxfleng(uint32_t inode)884 uint64_t write_data_getmaxfleng(uint32_t inode) {
885 	uint64_t maxfleng;
886 	inodedata* id;
887 	Glock lock(gMutex);
888 	id = write_find_inodedata(inode, lock);
889 	if (id) {
890 		maxfleng = id->maxfleng;
891 	} else {
892 		maxfleng = 0;
893 	}
894 	return maxfleng;
895 }
896 
write_data_flush_inode(uint32_t inode)897 int write_data_flush_inode(uint32_t inode) {
898 	Glock lock(gMutex);
899 	inodedata* id = write_find_inodedata(inode, lock);
900 	if (id == NULL) {
901 		return 0;
902 	}
903 	return write_data_flush(id, lock);
904 }
905 
write_data_truncate(uint32_t inode,bool opened,uint32_t uid,uint32_t gid,uint64_t length,Attributes & attr)906 int write_data_truncate(uint32_t inode, bool opened, uint32_t uid, uint32_t gid, uint64_t length,
907 		Attributes& attr) {
908 	Glock lock(gMutex);
909 
910 	// 1. Flush writes but don't finish it completely - it'll be done at the end of truncate
911 	inodedata* id = write_get_inodedata(inode, lock);
912 	if (id == NULL) {
913 		return LIZARDFS_ERROR_IO;
914 	}
915 	write_data_lcnt_increase(id, lock);
916 	write_data_flushwaiting_increase(id, lock); // this will block any writing to this inode
917 
918 	int err = write_data_flush(id, lock);
919 	if (err != 0) {
920 		write_data_flushwaiting_decrease(id, lock);
921 		write_data_lcnt_decrease(id, lock);
922 		return err;
923 	}
924 
925 	// 2. Send the request to master
926 	uint8_t status;
927 	bool writeNeeded;
928 	uint64_t oldLength;
929 	uint32_t lockId;
930 	lock.unlock();
931 	int retrySleepTime_us = 200000;
932 	uint32_t retries = 0;
933 	do {
934 		status = fs_truncate(inode, opened, uid, gid, length, writeNeeded, attr, oldLength, lockId);
935 		if (status != LIZARDFS_STATUS_OK) {
936 			lzfs_pretty_syslog(LOG_INFO, "truncate file %" PRIu32 " to length %" PRIu64 ": %s (try %d/%d)",
937 					inode, length, lizardfs_error_string(status), int(retries + 1), int(maxretries));
938 		}
939 		if (retries >= maxretries) {
940 			break;
941 		}
942 		if (status == LIZARDFS_ERROR_LOCKED) {
943 			sleep(1);
944 		} else if (status == LIZARDFS_ERROR_CHUNKLOST || status == LIZARDFS_ERROR_NOTDONE) {
945 			usleep(retrySleepTime_us);
946 			retrySleepTime_us = std::min(2 * retrySleepTime_us, 60 * 1000000);
947 			++retries;
948 		}
949 	} while (status == LIZARDFS_ERROR_LOCKED || status == LIZARDFS_ERROR_CHUNKLOST || status == LIZARDFS_ERROR_NOTDONE);
950 	lock.lock();
951 	if (status != 0 || !writeNeeded) {
952 		// Something failed or we have nothing to do more (master server managed to do the truncate)
953 		write_data_flushwaiting_decrease(id, lock);
954 		write_data_lcnt_decrease(id, lock);
955 		if (status == LIZARDFS_STATUS_OK) {
956 			return 0;
957 		} else {
958 			// status is now MFS status, so we cannot return any errno
959 			throw UnrecoverableWriteException("fs_truncate failed", status);
960 		}
961 	}
962 
963 	// We have to write zeros in suitable region to update xor/ec parity parts.
964 	// Let's calculate size of the region to be zeroed
965 	uint64_t endOffset = std::min({
966 		oldLength,                            // no further than to the end of the file
967 		length + slice_traits::ec::kMaxDataCount * MFSBLOCKSIZE, // no more than the maximal stripe
968 		(length + MFSCHUNKSIZE - 1) / MFSCHUNKSIZE * MFSCHUNKSIZE // no beyond the end of chunk
969 	});
970 
971 	if (endOffset > length) {
972 		// Something has to be written, so pass our lock to writing threads
973 		sassert(id->dataChain.empty());
974 		id->locator.reset(new TruncateWriteChunkLocator(inode, length / MFSCHUNKSIZE, lockId));
975 
976 		// And now pass block of zeros to writing threads
977 		std::vector<uint8_t> zeros(endOffset - length, 0);
978 		lock.unlock();
979 		err = write_blocks(id, length, zeros.size(), zeros.data());
980 		lock.lock();
981 		if (err != 0) {
982 			write_data_flushwaiting_decrease(id, lock);
983 			write_data_lcnt_decrease(id, lock);
984 			return err;
985 		}
986 
987 		// Wait for writing threads to finish
988 		err = write_data_flush(id, lock);
989 		id->locator.reset();
990 		if (err != 0) {
991 			// unlock the chunk here?
992 			write_data_flushwaiting_decrease(id, lock);
993 			write_data_lcnt_decrease(id, lock);
994 			return err;
995 		}
996 	}
997 
998 	// Now we can tell the master server to finish the truncate operation and then unblock the inode
999 	lock.unlock();
1000 	status = fs_truncateend(inode, uid, gid, length, lockId, attr);
1001 	write_data_flushwaiting_decrease(id, lock);
1002 	write_data_lcnt_decrease(id, lock);
1003 
1004 	if (status != LIZARDFS_STATUS_OK) {
1005 		// status is now MFS status, so we cannot return any errno
1006 		throw UnrecoverableWriteException("fs_truncateend failed", status);
1007 	}
1008 	return 0;
1009 }
1010 
write_data_end(void * vid)1011 int write_data_end(void* vid) {
1012 	Glock lock(gMutex);
1013 	inodedata* id = (inodedata*) vid;
1014 	if (id == NULL) {
1015 		return LIZARDFS_ERROR_IO;
1016 	}
1017 	int status = write_data_flush(id, lock);
1018 	write_data_lcnt_decrease(id, lock);
1019 	return status;
1020 }
1021