1 /*
2    Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2014 EditShare, 2013-2017 Skytechnology sp. z o.o..
3 
4    This file was part of MooseFS and is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 #include "master/chunks.h"
21 
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30 #include <unistd.h>
31 #include <unordered_map>
32 #include <algorithm>
33 #include <deque>
34 
35 #include "common/chunks_availability_state.h"
36 #include "common/chunk_copies_calculator.h"
37 #include "common/compact_vector.h"
38 #include "common/counting_sort.h"
39 #include "common/coroutine.h"
40 #include "common/datapack.h"
41 #include "common/exceptions.h"
42 #include "common/event_loop.h"
43 #include "common/flat_set.h"
44 #include "common/goal.h"
45 #include "common/hashfn.h"
46 #include "common/lizardfs_version.h"
47 #include "common/loop_watchdog.h"
48 #include "common/massert.h"
49 #include "common/slice_traits.h"
50 #include "common/small_vector.h"
51 #include "master/chunkserver_db.h"
52 #include "master/checksum.h"
53 #include "master/chunk_goal_counters.h"
54 #include "master/filesystem.h"
55 #include "master/get_servers_for_new_chunk.h"
56 #include "master/goal_cache.h"
57 #include "protocol/MFSCommunication.h"
58 
59 #ifdef METARESTORE
60 #  include <time.h>
61 #else
62 #  include "common/cfg.h"
63 #  include "common/main.h"
64 #  include "common/random.h"
65 #  include "master/matoclserv.h"
66 #  include "master/matocsserv.h"
67 #  include "master/topology.h"
68 #endif
69 
70 #define MINLOOPTIME 1
71 #define MAXLOOPTIME 7200
72 #define MAXCPS 10000000
73 #define MINCPS 500
74 #define MINCHUNKSLOOPPERIOD 40
75 #define MAXCHUNKSLOOPPERIOD 10000
76 #define MINCHUNKSLOOPCPU    10
77 #define MAXCHUNKSLOOPCPU    90
78 
79 #define HASHSIZE 0x100000
80 #define HASHPOS(chunkid) (((uint32_t)chunkid)&0xFFFFF)
81 
82 #define CHECKSUMSEED 78765491511151883ULL
83 
84 #ifndef METARESTORE
85 
86 static uint32_t gRedundancyLevel;
87 static uint64_t gEndangeredChunksServingLimit;
88 static uint64_t gEndangeredChunksMaxCapacity;
89 static uint64_t gDisconnectedCounter = 0;
90 bool gAvoidSameIpChunkservers = false;
91 
92 struct ChunkPart {
93 	enum {
94 		INVALID =
95 		    0,  /*!< Wrong version / or got info from chunkserver (IO error etc.)  ->  to delete. */
96 		DEL,    /*!< Deletion in progress. */
97 		BUSY,   /*!< Operation in progress. */
98 		VALID,  /*!< Ok. */
99 		TDBUSY, /*!< To delete + BUSY. */
100 		TDVALID /*!< Want to be deleted. */
101 	};
102 
103 	uint32_t version;   /*!< Part version. */
104 	ChunkPartType type; /*!< Part type. */
105 	uint16_t csid : 13; /*!< Chunkserver id. */
106 	uint16_t state : 3; /*!< Chunk part state. */
107 
ChunkPartChunkPart108 	ChunkPart() : version(0), type(), csid(0), state(INVALID) {
109 	}
110 
ChunkPartChunkPart111 	ChunkPart(const ChunkPart &other)
112 	    : version(other.version), type(other.type), csid(other.csid), state(other.state) {
113 	}
114 
ChunkPartChunkPart115 	ChunkPart(uint16_t part_csid, int part_state, uint32_t part_version,
116 	          const ChunkPartType &part_type)
117 	    : version(part_version), type(part_type), csid(part_csid), state(part_state) {
118 	}
119 
is_busyChunkPart120 	bool is_busy() const {
121 		return state == BUSY || state == TDBUSY;
122 	}
123 
is_validChunkPart124 	bool is_valid() const {
125 		return state != INVALID && state != DEL;
126 	}
127 
is_todelChunkPart128 	bool is_todel() const {
129 		return state == TDVALID || state == TDBUSY;
130 	}
131 
mark_busyChunkPart132 	void mark_busy() {
133 		switch (state) {
134 		case VALID:
135 			state = BUSY;
136 			break;
137 		case TDVALID:
138 			state = TDBUSY;
139 			break;
140 		default:
141 			sassert(!"ChunkPartInfo::mark_busy(): wrong state");
142 		}
143 	}
unmark_busyChunkPart144 	void unmark_busy() {
145 		switch (state) {
146 		case BUSY:
147 			state = VALID;
148 			break;
149 		case TDBUSY:
150 			state = TDVALID;
151 			break;
152 		default:
153 			sassert(!"ChunkPartInfo::unmark_busy(): wrong state");
154 		}
155 	}
mark_todelChunkPart156 	void mark_todel() {
157 		switch (state) {
158 		case VALID:
159 			state = TDVALID;
160 			break;
161 		case BUSY:
162 			state = TDBUSY;
163 			break;
164 		default:
165 			sassert(!"ChunkPartInfo::mark_todel(): wrong state");
166 		}
167 	}
unmark_todelChunkPart168 	void unmark_todel() {
169 		switch (state) {
170 		case TDVALID:
171 			state = VALID;
172 			break;
173 		case TDBUSY:
174 			state = BUSY;
175 			break;
176 		default:
177 			sassert(!"ChunkPartInfo::unmark_todel(): wrong state");
178 		}
179 	}
180 
serverChunkPart181 	matocsserventry *server() const {
182 		assert(csdb_find(csid));
183 		assert(csdb_find(csid)->eptr);
184 		return csdb_find(csid)->eptr;
185 	}
186 };
187 
188 static void*                         gChunkLoopEventHandle = NULL;
189 
190 static uint32_t gOperationsDelayDisconnect = 3600;
191 static uint32_t gOperationsDelayInit = 300;
192 
193 static uint32_t MaxWriteRepl;
194 static uint32_t MaxReadRepl;
195 static uint32_t MaxDelSoftLimit;
196 static uint32_t MaxDelHardLimit;
197 static double   TmpMaxDelFrac;
198 static uint32_t TmpMaxDel;
199 static uint32_t HashSteps;
200 static uint32_t HashCPS;
201 static uint32_t ChunksLoopPeriod;
202 static uint32_t ChunksLoopTimeout;
203 static double   gAcceptableDifference;
204 static bool     RebalancingBetweenLabels = false;
205 
206 static uint32_t jobsnorepbefore;
207 
208 static uint32_t starttime;
209 #endif // METARESTORE
210 
211 class Chunk {
212 	static constexpr int kMaxStatCount = 15;
213 	static_assert(CHUNK_MATRIX_SIZE <= kMaxStatCount, "stats matrix size too big for internal stats storage");
214 	static_assert(ChunksAvailabilityState::kStateCount <= 3, "not enough space for chunk state");
215 
216 public:
217 	/* chunk.operation */
218 	enum {
219 		NONE,
220 		CREATE,
221 		SET_VERSION,
222 		DUPLICATE,
223 		TRUNCATE,
224 		DUPTRUNC
225 	};
226 
227 	uint64_t chunkid;
228 	uint64_t checksum;
229 	Chunk *next;
230 #ifndef METARESTORE
231 	compact_vector<ChunkPart> parts;
232 #endif
233 private: // public/private sections are mixed here to make the struct as small as possible
234 	ChunkGoalCounters goalCounters_;
235 public:
236 	uint32_t version;
237 	uint32_t lockid;
238 	uint32_t lockedto;
239 #ifndef METARESTORE
240 	uint8_t inEndangeredQueue:1;
241 	uint8_t needverincrease:1;
242 	uint8_t interrupted:1;
243 	uint8_t operation:3;
244 private:
245 	uint8_t allAvailabilityState_:2;
246 	uint8_t copiesInStats_:4;
247 	uint8_t allMissingParts_:4;
248 	uint8_t allRedundantParts_:4;
249 	uint8_t allFullCopies_:4;
250 #endif
251 
252 public:
253 #ifndef METARESTORE
254 	static ChunksAvailabilityState allChunksAvailability;
255 	static ChunksReplicationState allChunksReplicationState;
256 	static uint64_t count;
257 	static uint64_t allFullChunkCopies[CHUNK_MATRIX_SIZE][CHUNK_MATRIX_SIZE];
258 	static std::deque<Chunk *> endangeredChunks;
259 	static GoalCache goalCache;
260 #endif
261 
clear()262 	void clear() {
263 		goalCounters_.clear();
264 		next = nullptr;
265 		chunkid = 0;
266 		version = 0;
267 		lockid = 0;
268 		lockedto = 0;
269 		checksum = 0;
270 #ifndef METARESTORE
271 		inEndangeredQueue = 0;
272 		needverincrease = 1;
273 		interrupted = 0;
274 		operation = Chunk::NONE;
275 		parts.clear();
276 		allMissingParts_ = 0;
277 		allRedundantParts_= 0;
278 		allFullCopies_ = 0;
279 		allAvailabilityState_ = ChunksAvailabilityState::kSafe;
280 		copiesInStats_ = 0;
281 		count++;
282 		updateStats(false);
283 #endif
284 	}
285 
286 	// Highest id of the chunk's goal
287 	// This function is preserved only for backward compatibility of metadata checksums
288 	// and shouldn't be used anywhere else.
highestIdGoal() const289 	uint8_t highestIdGoal() const {
290 		return goalCounters_.highestIdGoal();
291 	}
292 
293 	// Number of files this chunk belongs to
fileCount() const294 	uint32_t fileCount() const {
295 		return goalCounters_.fileCount();
296 	}
297 
298 	// Called when this chunk becomes a part of a file with the given goal
addFileWithGoal(uint8_t goal)299 	void addFileWithGoal(uint8_t goal) {
300 #ifndef METARESTORE
301 		removeFromStats();
302 #endif
303 		goalCounters_.addFile(goal);
304 #ifndef METARESTORE
305 		updateStats(false);
306 #endif
307 	}
308 
309 	// Called when a file that this chunk belongs to is removed
removeFileWithGoal(uint8_t goal)310 	void removeFileWithGoal(uint8_t goal) {
311 #ifndef METARESTORE
312 		removeFromStats();
313 #endif
314 		goalCounters_.removeFile(goal);
315 #ifndef METARESTORE
316 		updateStats(false);
317 #endif
318 	}
319 
320 	// Called when a file that this chunk belongs to changes goal
changeFileGoal(uint8_t prevGoal,uint8_t newGoal)321 	void changeFileGoal(uint8_t prevGoal, uint8_t newGoal) {
322 #ifndef METARESTORE
323 		removeFromStats();
324 #endif
325 		goalCounters_.changeFileGoal(prevGoal, newGoal);
326 #ifndef METARESTORE
327 		updateStats(false);
328 #endif
329 	}
330 
331 #ifndef METARESTORE
getGoal()332 	Goal getGoal() {
333 		// Do not search for empty goalCounters in cache
334 		if (goalCounters_.size() == 0) {
335 			return Goal();
336 		}
337 
338 		auto it = goalCache.find(goalCounters_);
339 		if (it != goalCache.end()) {
340 			return it->second;
341 		}
342 
343 		Goal result;
344 		int prev_goal = -1;
345 		for (auto counter : goalCounters_) {
346 			const Goal &goal = fs_get_goal_definition(counter.goal);
347 			if (prev_goal != (int)counter.goal) {
348 				result.mergeIn(goal);
349 				prev_goal = counter.goal;
350 			}
351 		}
352 
353 		goalCache.insert(goalCounters_, result);
354 		return result;
355 	}
356 
357 	// This method should be called when a chunk is removed
freeStats()358 	void freeStats() {
359 		count--;
360 		removeFromStats();
361 	}
362 
363 	// Updates statistics of all chunks
updateStats(bool remove_from_stats=true)364 	void updateStats(bool remove_from_stats = true) {
365 		int oldAllMissingParts = allMissingParts_;
366 
367 		if (remove_from_stats) {
368 			removeFromStats();
369 		}
370 
371 		Goal g = getGoal();
372 
373 		ChunkCopiesCalculator all(g);
374 
375 		for (const auto &part : parts) {
376 			if (!part.is_valid()) {
377 				continue;
378 			}
379 			all.addPart(part.type, csdb_find(part.csid)->label);
380 		}
381 
382 		all.optimize();
383 
384 		allFullCopies_ = std::min(kMaxStatCount, all.getFullCopiesCount());
385 		allAvailabilityState_ = all.getState();
386 		allMissingParts_ = std::min(kMaxStatCount, all.countPartsToRecover());
387 		allRedundantParts_ = std::min(kMaxStatCount, all.countPartsToRemove());
388 		copiesInStats_ = std::min(kMaxStatCount, ChunkCopiesCalculator::getFullCopiesCount(g));
389 
390 		/* Enqueue a chunk as endangered only if:
391 		 * 1. Endangered chunks prioritization is on (limit > 0)
392 		 * 2. Limit of endangered chunks in queue is not reached
393 		 * 3. Chunk has more missing parts than it used to
394 		 * 4. Chunk is endangered
395 		 * 5. It is not already in queue
396 		 * By checking conditions below we assert no repetitions in endangered queue. */
397 		if (gEndangeredChunksServingLimit > 0
398 				&& endangeredChunks.size() < gEndangeredChunksMaxCapacity
399 				&& allMissingParts_ > oldAllMissingParts
400 				&& allAvailabilityState_ == ChunksAvailabilityState::kEndangered
401 				&& !inEndangeredQueue) {
402 			inEndangeredQueue = 1;
403 			endangeredChunks.push_back(this);
404 		}
405 
406 		addToStats();
407 	}
408 
isSafe() const409 	bool isSafe() const {
410 		return allAvailabilityState_ == ChunksAvailabilityState::kSafe;
411 	}
412 
isEndangered() const413 	bool isEndangered() const {
414 		return allAvailabilityState_ == ChunksAvailabilityState::kEndangered;
415 	}
416 
isLost() const417 	bool isLost() const {
418 		return allAvailabilityState_ == ChunksAvailabilityState::kLost;
419 	}
420 
isWritable()421 	bool isWritable() {
422 		return !isLost();
423 	}
424 
countMissingParts() const425 	int countMissingParts() const {
426 		return allMissingParts_;
427 	}
428 
countRedundantParts() const429 	bool countRedundantParts() const {
430 		return allRedundantParts_;
431 	}
432 
getFullCopiesCount() const433 	uint8_t getFullCopiesCount() const {
434 		return allFullCopies_;
435 	}
436 
isLocked() const437 	bool isLocked() const {
438 		return lockedto >= eventloop_time();
439 	}
440 
markCopyAsHavingWrongVersion(ChunkPart & part)441 	void markCopyAsHavingWrongVersion(ChunkPart &part) {
442 		part.state = ChunkPart::INVALID;
443 		updateStats();
444 	}
445 
invalidateCopy(ChunkPart & part)446 	void invalidateCopy(ChunkPart &part) {
447 		part.state = ChunkPart::INVALID;
448 		part.version = 0;
449 		updateStats();
450 	}
451 
deleteCopy(ChunkPart & part)452 	void deleteCopy(ChunkPart &part) {
453 		part.state = ChunkPart::DEL;
454 		updateStats();
455 	}
456 
457 private:
allCopiesState() const458 	ChunksAvailabilityState::State allCopiesState() const {
459 		return static_cast<ChunksAvailabilityState::State>(allAvailabilityState_);
460 	}
461 
removeFromStats()462 	void removeFromStats() {
463 		int prev_goal = -1;
464 		for (const auto& counter : goalCounters_) {
465 			if (prev_goal == (int)counter.goal) {
466 				continue;
467 			}
468 			prev_goal = counter.goal;
469 			allChunksAvailability.removeChunk(counter.goal, allCopiesState());
470 			allChunksReplicationState.removeChunk(counter.goal, allMissingParts_, allRedundantParts_);
471 		}
472 
473 		uint8_t limitedGoal = std::min<uint8_t>(CHUNK_MATRIX_SIZE - 1, copiesInStats_);
474 		uint8_t limitedAll = std::min<uint8_t>(CHUNK_MATRIX_SIZE - 1, allFullCopies_);
475 		allFullChunkCopies[limitedGoal][limitedAll]--;
476 	}
477 
addToStats()478 	void addToStats() {
479 		int prev_goal = -1;
480 		for (const auto& counter : goalCounters_) {
481 			if (prev_goal == (int)counter.goal) {
482 				continue;
483 			}
484 			prev_goal = counter.goal;
485 			allChunksAvailability.addChunk(counter.goal, allCopiesState());
486 			allChunksReplicationState.addChunk(counter.goal, allMissingParts_, allRedundantParts_);
487 		}
488 
489 		uint8_t limitedGoal = std::min<uint8_t>(CHUNK_MATRIX_SIZE - 1, copiesInStats_);
490 		uint8_t limitedAll = std::min<uint8_t>(CHUNK_MATRIX_SIZE - 1, allFullCopies_);
491 		allFullChunkCopies[limitedGoal][limitedAll]++;
492 	}
493 #endif
494 };
495 
496 constexpr int Chunk::kMaxStatCount;
497 
498 #ifndef METARESTORE
499 
500 std::deque<Chunk *> Chunk::endangeredChunks;
501 GoalCache Chunk::goalCache(10000);
502 ChunksAvailabilityState Chunk::allChunksAvailability;
503 ChunksReplicationState Chunk::allChunksReplicationState;
504 uint64_t Chunk::count;
505 uint64_t Chunk::allFullChunkCopies[CHUNK_MATRIX_SIZE][CHUNK_MATRIX_SIZE];
506 #endif
507 
508 #define CHUNK_BUCKET_SIZE 20000
509 struct chunk_bucket {
510 	Chunk bucket[CHUNK_BUCKET_SIZE];
511 	uint32_t firstfree;
512 	chunk_bucket *next;
513 };
514 
515 namespace {
516 struct ChunksMetadata {
517 	// chunks
518 	chunk_bucket *cbhead;
519 	Chunk *chfreehead;
520 	Chunk *chunkhash[HASHSIZE];
521 	uint64_t lastchunkid;
522 	Chunk *lastchunkptr;
523 
524 	// other chunks metadata information
525 	uint64_t nextchunkid;
526 	uint64_t chunksChecksum;
527 	uint64_t chunksChecksumRecalculated;
528 	uint32_t checksumRecalculationPosition;
529 
ChunksMetadata__anond7a408460311::ChunksMetadata530 	ChunksMetadata() :
531 			cbhead{},
532 			chfreehead{},
533 			chunkhash{},
534 			lastchunkid{},
535 			lastchunkptr{},
536 			nextchunkid{1},
537 			chunksChecksum{},
538 			chunksChecksumRecalculated{},
539 			checksumRecalculationPosition{0} {
540 	}
541 
~ChunksMetadata__anond7a408460311::ChunksMetadata542 	~ChunksMetadata() {
543 		chunk_bucket *cbn;
544 		for (chunk_bucket *cb = cbhead; cb; cb = cbn) {
545 			cbn = cb->next;
546 			delete cb;
547 		}
548 	}
549 };
550 } // anonymous namespace
551 
552 static ChunksMetadata *gChunksMetadata;
553 
554 #define LOCKTIMEOUT 120
555 #define UNUSED_DELETE_TIMEOUT (86400*7)
556 
557 #ifndef METARESTORE
558 
559 static Chunk *gCurrentChunkInZombieLoop = nullptr;
560 
561 class ReplicationDelayInfo {
562 public:
ReplicationDelayInfo()563 	ReplicationDelayInfo()
564 		: disconnectedServers_(0),
565 		  timestamp_() {}
566 
serverDisconnected()567 	void serverDisconnected() {
568 		refresh();
569 		++disconnectedServers_;
570 		timestamp_ = eventloop_time() + gOperationsDelayDisconnect;
571 	}
572 
serverConnected()573 	void serverConnected() {
574 		refresh();
575 		if (disconnectedServers_ > 0) {
576 			--disconnectedServers_;
577 		}
578 	}
579 
replicationAllowed(int missingCopies)580 	bool replicationAllowed(int missingCopies) {
581 		refresh();
582 		return missingCopies > disconnectedServers_;
583 	}
584 
585 private:
586 	uint16_t disconnectedServers_;
587 	uint32_t timestamp_;
588 
refresh()589 	void refresh() {
590 		if (eventloop_time() > timestamp_) {
591 			disconnectedServers_ = 0;
592 		}
593 	}
594 
595 };
596 
597 /*
598  * Information about recently disconnected and connected servers
599  * necessary for replication to unlabeled servers.
600  */
601 static ReplicationDelayInfo replicationDelayInfoForAll;
602 
603 /*
604  * Information about recently disconnected and connected servers
605  * necessary for replication to servers with specified label.
606  */
607 static std::unordered_map<MediaLabel, ReplicationDelayInfo, MediaLabel::hash> replicationDelayInfoForLabel;
608 
609 struct job_info {
610 	uint32_t del_invalid;
611 	uint32_t del_unused;
612 	uint32_t del_diskclean;
613 	uint32_t del_overgoal;
614 	uint32_t copy_undergoal;
615 };
616 
617 struct loop_info {
618 	job_info done,notdone;
619 	uint32_t copy_rebalance;
620 };
621 
622 static loop_info chunksinfo = {{0,0,0,0,0},{0,0,0,0,0},0};
623 static uint32_t chunksinfo_loopstart=0,chunksinfo_loopend=0;
624 
625 static uint32_t stats_deletions=0;
626 static uint32_t stats_replications=0;
627 
chunk_stats(uint32_t * del,uint32_t * repl)628 void chunk_stats(uint32_t *del,uint32_t *repl) {
629 	*del = stats_deletions;
630 	*repl = stats_replications;
631 	stats_deletions = 0;
632 	stats_replications = 0;
633 }
634 
635 #endif // ! METARESTORE
636 
chunk_checksum(const Chunk * c)637 static uint64_t chunk_checksum(const Chunk *c) {
638 	if (c == nullptr || c->fileCount() == 0) {
639 		// We treat chunks with fileCount=0 as non-existent, so that we don't have to notify shadow
640 		// masters when we remove them from our structures.
641 		return 0;
642 	}
643 	uint64_t checksum = 64517419147637ULL;
644 	// Only highest id goal is taken into checksum for compatibility reasons
645 	hashCombine(checksum, c->chunkid, c->version, c->lockedto, c->highestIdGoal(), c->fileCount());
646 
647 	return checksum;
648 }
649 
chunk_checksum_add_to_background(Chunk * ch)650 static void chunk_checksum_add_to_background(Chunk *ch) {
651 	if (!ch) {
652 		return;
653 	}
654 	removeFromChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
655 	ch->checksum = chunk_checksum(ch);
656 	addToChecksum(gChunksMetadata->chunksChecksumRecalculated, ch->checksum);
657 	addToChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
658 }
659 
chunk_update_checksum(Chunk * ch)660 static void chunk_update_checksum(Chunk *ch) {
661 	if (!ch) {
662 		return;
663 	}
664 	if (HASHPOS(ch->chunkid) < gChunksMetadata->checksumRecalculationPosition) {
665 		removeFromChecksum(gChunksMetadata->chunksChecksumRecalculated, ch->checksum);
666 	}
667 	removeFromChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
668 	ch->checksum = chunk_checksum(ch);
669 	if (HASHPOS(ch->chunkid) < gChunksMetadata->checksumRecalculationPosition) {
670 		lzfs_silent_syslog(LOG_DEBUG, "master.fs.checksum.changing_recalculated_chunk");
671 		addToChecksum(gChunksMetadata->chunksChecksumRecalculated, ch->checksum);
672 	} else {
673 		lzfs_silent_syslog(LOG_DEBUG, "master.fs.checksum.changing_not_recalculated_chunk");
674 	}
675 	addToChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
676 }
677 
678 /*!
679  * \brief update chunks checksum in the background
680  * \param limit for processed chunks per function call
681  * \return info whether all chunks were updated or not.
682  */
683 
chunks_update_checksum_a_bit(uint32_t speedLimit)684 ChecksumRecalculationStatus chunks_update_checksum_a_bit(uint32_t speedLimit) {
685 	if (gChunksMetadata->checksumRecalculationPosition == 0) {
686 		gChunksMetadata->chunksChecksumRecalculated = CHECKSUMSEED;
687 	}
688 	uint32_t recalculated = 0;
689 	while (gChunksMetadata->checksumRecalculationPosition < HASHSIZE) {
690 		Chunk *c;
691 		for (c = gChunksMetadata->chunkhash[gChunksMetadata->checksumRecalculationPosition]; c; c=c->next) {
692 			chunk_checksum_add_to_background(c);
693 			++recalculated;
694 		}
695 		++gChunksMetadata->checksumRecalculationPosition;
696 		if (recalculated >= speedLimit) {
697 			return ChecksumRecalculationStatus::kInProgress;
698 		}
699 	}
700 	// Recalculating chunks checksum finished
701 	gChunksMetadata->checksumRecalculationPosition = 0;
702 	if (gChunksMetadata->chunksChecksum != gChunksMetadata->chunksChecksumRecalculated) {
703 		lzfs_pretty_syslog(LOG_WARNING,"Chunks metadata checksum mismatch found, replacing with a new value.");
704 		lzfs_silent_syslog(LOG_DEBUG, "master.fs.checksum.mismatch");
705 		gChunksMetadata->chunksChecksum = gChunksMetadata->chunksChecksumRecalculated;
706 	}
707 	return ChecksumRecalculationStatus::kDone;
708 }
709 
chunk_recalculate_checksum()710 static void chunk_recalculate_checksum() {
711 	gChunksMetadata->chunksChecksum = CHECKSUMSEED;
712 	for (int i = 0; i < HASHSIZE; ++i) {
713 		for (Chunk *ch = gChunksMetadata->chunkhash[i]; ch; ch = ch->next) {
714 			ch->checksum = chunk_checksum(ch);
715 			addToChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
716 		}
717 	}
718 }
719 
chunk_checksum(ChecksumMode mode)720 uint64_t chunk_checksum(ChecksumMode mode) {
721 	uint64_t checksum = 46586918175221;
722 	addToChecksum(checksum, gChunksMetadata->nextchunkid);
723 	if (mode == ChecksumMode::kForceRecalculate) {
724 		chunk_recalculate_checksum();
725 	}
726 	addToChecksum(checksum, gChunksMetadata->chunksChecksum);
727 	return checksum;
728 }
729 
chunk_malloc()730 static inline Chunk *chunk_malloc() {
731 	chunk_bucket *cb;
732 	Chunk *ret;
733 	if (gChunksMetadata->chfreehead) {
734 		ret = gChunksMetadata->chfreehead;
735 		gChunksMetadata->chfreehead = ret->next;
736 		ret->clear();
737 		return ret;
738 	}
739 	if (gChunksMetadata->cbhead==NULL || gChunksMetadata->cbhead->firstfree==CHUNK_BUCKET_SIZE) {
740 		cb = new chunk_bucket;
741 		cb->next = gChunksMetadata->cbhead;
742 		cb->firstfree = 0;
743 		gChunksMetadata->cbhead = cb;
744 	}
745 	ret = (gChunksMetadata->cbhead->bucket)+(gChunksMetadata->cbhead->firstfree);
746 	gChunksMetadata->cbhead->firstfree++;
747 	ret->clear();
748 	return ret;
749 }
750 
751 #ifndef METARESTORE
chunk_free(Chunk * p)752 static inline void chunk_free(Chunk *p) {
753 	p->next = gChunksMetadata->chfreehead;
754 	gChunksMetadata->chfreehead = p;
755 	p->inEndangeredQueue = 0;
756 }
757 #endif /* METARESTORE */
758 
chunk_new(uint64_t chunkid,uint32_t chunkversion)759 Chunk *chunk_new(uint64_t chunkid, uint32_t chunkversion) {
760 	uint32_t chunkpos = HASHPOS(chunkid);
761 	Chunk *newchunk;
762 	newchunk = chunk_malloc();
763 	newchunk->next = gChunksMetadata->chunkhash[chunkpos];
764 	gChunksMetadata->chunkhash[chunkpos] = newchunk;
765 	newchunk->chunkid = chunkid;
766 	newchunk->version = chunkversion;
767 	gChunksMetadata->lastchunkid = chunkid;
768 	gChunksMetadata->lastchunkptr = newchunk;
769 	chunk_update_checksum(newchunk);
770 	return newchunk;
771 }
772 
773 #ifndef METARESTORE
chunk_emergency_increase_version(Chunk * c)774 void chunk_emergency_increase_version(Chunk *c) {
775 	assert(c->isWritable());
776 	uint32_t i = 0;
777 	for (auto &part : c->parts) {
778 		if (part.is_valid()) {
779 			if (!part.is_busy()) {
780 				part.mark_busy();
781 			}
782 			part.version = c->version+1;
783 			matocsserv_send_setchunkversion(part.server(),c->chunkid,c->version+1,c->version,
784 					part.type);
785 			i++;
786 		}
787 	}
788 	c->interrupted = 0;
789 	c->operation = Chunk::SET_VERSION;
790 	c->version++;
791 	chunk_update_checksum(c);
792 	fs_incversion(c->chunkid);
793 }
794 
chunk_handle_disconnected_copies(Chunk * c)795 void chunk_handle_disconnected_copies(Chunk *c) {
796 	auto it = std::remove_if(c->parts.begin(), c->parts.end(), [](const ChunkPart &part) {
797 		return csdb_find(part.csid)->eptr == nullptr;
798 	});
799 	bool lost_copy_found = it != c->parts.end();
800 
801 	if (lost_copy_found) {
802 		c->parts.erase(it, c->parts.end());
803 		c->needverincrease = 1;
804 		c->updateStats();
805 	}
806 
807 	if (lost_copy_found && c->operation != Chunk::NONE) {
808 		bool any_copy_busy = std::any_of(c->parts.begin(), c->parts.end(), [](const ChunkPart &part) {
809 			return part.is_busy();
810 		});
811 		if (any_copy_busy) {
812 			c->interrupted = 1;
813 		} else {
814 			if (c->isWritable()) {
815 				chunk_emergency_increase_version(c);
816 			} else {
817 				matoclserv_chunk_status(c->chunkid,LIZARDFS_ERROR_NOTDONE);
818 				c->operation = Chunk::NONE;
819 			}
820 		}
821 	}
822 }
823 #endif
824 
chunk_find(uint64_t chunkid)825 Chunk *chunk_find(uint64_t chunkid) {
826 	uint32_t chunkpos = HASHPOS(chunkid);
827 	Chunk *chunkit;
828 	if (gChunksMetadata->lastchunkid==chunkid) {
829 		return gChunksMetadata->lastchunkptr;
830 	}
831 	for (chunkit = gChunksMetadata->chunkhash[chunkpos] ; chunkit ; chunkit = chunkit->next) {
832 		if (chunkit->chunkid == chunkid) {
833 			gChunksMetadata->lastchunkid = chunkid;
834 			gChunksMetadata->lastchunkptr = chunkit;
835 #ifndef METARESTORE
836 			chunk_handle_disconnected_copies(chunkit);
837 #endif // METARESTORE
838 			return chunkit;
839 		}
840 	}
841 	return NULL;
842 }
843 
844 #ifndef METARESTORE
chunk_delete(Chunk * c)845 void chunk_delete(Chunk *c) {
846 	if (gChunksMetadata->lastchunkptr==c) {
847 		gChunksMetadata->lastchunkid=0;
848 		gChunksMetadata->lastchunkptr=NULL;
849 	}
850 	c->freeStats();
851 	chunk_free(c);
852 }
853 
chunk_count(void)854 uint32_t chunk_count(void) {
855 	return Chunk::count;
856 }
857 
chunk_info(uint32_t * allchunks,uint32_t * allcopies,uint32_t * regularvalidcopies)858 void chunk_info(uint32_t *allchunks,uint32_t *allcopies,uint32_t *regularvalidcopies) {
859 	*allchunks = Chunk::count;
860 	*allcopies = 0;
861 	*regularvalidcopies = 0;
862 	for (int actualCopies = 1; actualCopies < CHUNK_MATRIX_SIZE; actualCopies++) {
863 		uint32_t ag = 0;
864 		for (int expectedCopies = 0; expectedCopies < CHUNK_MATRIX_SIZE; expectedCopies++) {
865 			ag += Chunk::allFullChunkCopies[expectedCopies][actualCopies];
866 		}
867 		*allcopies += ag * actualCopies;
868 	}
869 }
870 
chunk_get_missing_count(void)871 uint32_t chunk_get_missing_count(void) {
872 	uint32_t res = 0;
873 	for (uint8_t goal = GoalId::kMin; goal <= GoalId::kMax; ++goal) {
874 		res += Chunk::allChunksAvailability.lostChunks(goal);
875 	}
876 	return res;
877 }
878 
chunk_store_chunkcounters(uint8_t * buff,uint8_t matrixid)879 void chunk_store_chunkcounters(uint8_t *buff,uint8_t matrixid) {
880 	if (matrixid == MATRIX_ALL_COPIES) {
881 		for (int i = 0; i < CHUNK_MATRIX_SIZE; i++) {
882 			for (int j = 0; j < CHUNK_MATRIX_SIZE; j++) {
883 				put32bit(&buff, Chunk::allFullChunkCopies[i][j]);
884 			}
885 		}
886 	} else {
887 		memset(buff, 0, CHUNK_MATRIX_SIZE * CHUNK_MATRIX_SIZE * sizeof(uint32_t));
888 	}
889 }
890 #endif
891 
892 /// updates chunk's goal after a file goal has been changed
chunk_change_file(uint64_t chunkid,uint8_t prevgoal,uint8_t newgoal)893 int chunk_change_file(uint64_t chunkid,uint8_t prevgoal,uint8_t newgoal) {
894 	Chunk *c;
895 	if (prevgoal==newgoal) {
896 		return LIZARDFS_STATUS_OK;
897 	}
898 	c = chunk_find(chunkid);
899 	if (c==NULL) {
900 		return LIZARDFS_ERROR_NOCHUNK;
901 	}
902 	try {
903 		c->changeFileGoal(prevgoal, newgoal);
904 	} catch (Exception& ex) {
905 		lzfs_pretty_syslog(LOG_WARNING, "chunk_change_file: %s", ex.what());
906 		return LIZARDFS_ERROR_CHUNKLOST;
907 	}
908 	chunk_update_checksum(c);
909 	return LIZARDFS_STATUS_OK;
910 }
911 
912 /// updates chunk's goal after a file with goal `goal' has been removed
chunk_delete_file_int(Chunk * c,uint8_t goal)913 static inline int chunk_delete_file_int(Chunk *c, uint8_t goal) {
914 	try {
915 		c->removeFileWithGoal(goal);
916 	} catch (Exception& ex) {
917 		lzfs_pretty_syslog(LOG_WARNING, "chunk_delete_file_int: %s", ex.what());
918 		return LIZARDFS_ERROR_CHUNKLOST;
919 	}
920 	chunk_update_checksum(c);
921 	return LIZARDFS_STATUS_OK;
922 }
923 
924 /// updates chunk's goal after a file with goal `goal' has been added
chunk_add_file_int(Chunk * c,uint8_t goal)925 static inline int chunk_add_file_int(Chunk *c, uint8_t goal) {
926 	try {
927 		c->addFileWithGoal(goal);
928 	} catch (Exception& ex) {
929 		lzfs_pretty_syslog(LOG_WARNING, "chunk_add_file_int: %s", ex.what());
930 		return LIZARDFS_ERROR_CHUNKLOST;
931 	}
932 	chunk_update_checksum(c);
933 	return LIZARDFS_STATUS_OK;
934 }
935 
chunk_delete_file(uint64_t chunkid,uint8_t goal)936 int chunk_delete_file(uint64_t chunkid,uint8_t goal) {
937 	Chunk *c;
938 	c = chunk_find(chunkid);
939 	if (c==NULL) {
940 		return LIZARDFS_ERROR_NOCHUNK;
941 	}
942 	return chunk_delete_file_int(c,goal);
943 }
944 
chunk_add_file(uint64_t chunkid,uint8_t goal)945 int chunk_add_file(uint64_t chunkid,uint8_t goal) {
946 	Chunk *c;
947 	c = chunk_find(chunkid);
948 	if (c==NULL) {
949 		return LIZARDFS_ERROR_NOCHUNK;
950 	}
951 	return chunk_add_file_int(c,goal);
952 }
953 
chunk_can_unlock(uint64_t chunkid,uint32_t lockid)954 int chunk_can_unlock(uint64_t chunkid, uint32_t lockid) {
955 	Chunk *c;
956 	c = chunk_find(chunkid);
957 	if (c==NULL) {
958 		return LIZARDFS_ERROR_NOCHUNK;
959 	}
960 	if (lockid == 0) {
961 		// lockid == 0 -> force unlock
962 		return LIZARDFS_STATUS_OK;
963 	}
964 	// We will let client unlock the chunk even if c->lockedto < eventloop_time()
965 	// if he provides lockId that was used to lock the chunk -- this means that nobody
966 	// else used this chunk since it was locked (operations like truncate or replicate
967 	// would remove such a stale lock before modifying the chunk)
968 	if (c->lockid == lockid) {
969 		return LIZARDFS_STATUS_OK;
970 	} else if (c->lockedto == 0) {
971 		return LIZARDFS_ERROR_NOTLOCKED;
972 	} else {
973 		return LIZARDFS_ERROR_WRONGLOCKID;
974 	}
975 }
976 
chunk_unlock(uint64_t chunkid)977 int chunk_unlock(uint64_t chunkid) {
978 	Chunk *c;
979 	c = chunk_find(chunkid);
980 	if (c==NULL) {
981 		return LIZARDFS_ERROR_NOCHUNK;
982 	}
983 	// Don't remove lockid to safely accept retransmission of FUSE_CHUNK_UNLOCK message
984 	c->lockedto = 0;
985 	chunk_update_checksum(c);
986 	return LIZARDFS_STATUS_OK;
987 }
988 
989 #ifndef METARESTORE
990 
chunk_invalidate_goal_cache()991 int chunk_invalidate_goal_cache(){
992 	Chunk::goalCache.clear();
993 	return LIZARDFS_STATUS_OK;
994 }
995 
chunk_has_only_invalid_copies(uint64_t chunkid)996 bool chunk_has_only_invalid_copies(uint64_t chunkid) {
997 	if (chunkid == 0) {
998 		return false;
999 	}
1000 	Chunk *c = chunk_find(chunkid);
1001 	if (c == NULL || !c->isLost()) {
1002 		return false;
1003 	}
1004 	// Chunk is lost, so it can only have INVALID or DEL copies.
1005 	// Return true it there is at least one INVALID.
1006 	return std::any_of(c->parts.begin(), c->parts.end(), [](const ChunkPart& part) {
1007 		return part.state == ChunkPart::INVALID;
1008 	});
1009 }
1010 
chunk_get_fullcopies(uint64_t chunkid,uint8_t * vcopies)1011 int chunk_get_fullcopies(uint64_t chunkid,uint8_t *vcopies) {
1012 	Chunk *c;
1013 	*vcopies = 0;
1014 	c = chunk_find(chunkid);
1015 	if (c==NULL) {
1016 		return LIZARDFS_ERROR_NOCHUNK;
1017 	}
1018 
1019 	*vcopies = c->getFullCopiesCount();
1020 
1021 	return LIZARDFS_STATUS_OK;
1022 }
1023 
chunk_get_partstomodify(uint64_t chunkid,int & recover,int & remove)1024 int chunk_get_partstomodify(uint64_t chunkid, int &recover, int &remove) {
1025 	Chunk *c;
1026 	recover = 0;
1027 	remove = 0;
1028 	c = chunk_find(chunkid);
1029 	if (c==NULL) {
1030 		return LIZARDFS_ERROR_NOCHUNK;
1031 	}
1032 	recover = c->countMissingParts();
1033 	remove = c->countRedundantParts();
1034 	return LIZARDFS_STATUS_OK;
1035 }
1036 
chunk_multi_modify(uint64_t ochunkid,uint32_t * lockid,uint8_t goal,bool usedummylockid,bool quota_exceeded,uint8_t * opflag,uint64_t * nchunkid,uint32_t min_server_version=0)1037 uint8_t chunk_multi_modify(uint64_t ochunkid, uint32_t *lockid, uint8_t goal,
1038 		bool usedummylockid, bool quota_exceeded, uint8_t *opflag, uint64_t *nchunkid,
1039 		uint32_t min_server_version = 0) {
1040 	Chunk *c = NULL;
1041 	if (ochunkid == 0) { // new chunk
1042 		if (quota_exceeded) {
1043 			return LIZARDFS_ERROR_QUOTA;
1044 		}
1045 		auto serversWithChunkTypes = matocsserv_getservers_for_new_chunk(goal, min_server_version);
1046 		if (serversWithChunkTypes.empty()) {
1047 			uint16_t uscount,tscount;
1048 			double minusage,maxusage;
1049 			matocsserv_usagedifference(&minusage,&maxusage,&uscount,&tscount);
1050 			if ((uscount > 0) && (eventloop_time() > (starttime+600))) { // if there are chunkservers and it's at least one minute after start then it means that there is no space left
1051 				return LIZARDFS_ERROR_NOSPACE;
1052 			} else {
1053 				return LIZARDFS_ERROR_NOCHUNKSERVERS;
1054 			}
1055 		}
1056 		ChunkCopiesCalculator calculator(fs_get_goal_definition(goal));
1057 		for (const auto &server_with_type : serversWithChunkTypes) {
1058 			calculator.addPart(server_with_type.second, MediaLabel::kWildcard);
1059 		}
1060 		calculator.evalRedundancyLevel();
1061 		if (!calculator.isSafeEnoughToWrite(gRedundancyLevel)) {
1062 			return LIZARDFS_ERROR_NOCHUNKSERVERS;
1063 		}
1064 		c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1065 		c->interrupted = 0;
1066 		c->operation = Chunk::CREATE;
1067 		chunk_add_file_int(c,goal);
1068 		for (const auto &server_with_type : serversWithChunkTypes) {
1069 			c->parts.push_back(ChunkPart(matocsserv_get_csdb(server_with_type.first)->csid,
1070 			                             ChunkPart::BUSY, c->version, server_with_type.second));
1071 			matocsserv_send_createchunk(server_with_type.first, c->chunkid, server_with_type.second,
1072 			                            c->version);
1073 		}
1074 		c->updateStats();
1075 		*opflag=1;
1076 		*nchunkid = c->chunkid;
1077 	} else {
1078 		Chunk *oc = chunk_find(ochunkid);
1079 		if (oc==NULL) {
1080 			return LIZARDFS_ERROR_NOCHUNK;
1081 		}
1082 		if (*lockid != 0 && *lockid != oc->lockid) {
1083 			if (oc->lockid == 0 || oc->lockedto == 0) {
1084 				// Lock was removed by some chunk operation or by a different client
1085 				return LIZARDFS_ERROR_NOTLOCKED;
1086 			} else {
1087 				return LIZARDFS_ERROR_WRONGLOCKID;
1088 			}
1089 		}
1090 		if (*lockid == 0 && oc->isLocked()) {
1091 			return LIZARDFS_ERROR_LOCKED;
1092 		}
1093 		if (!oc->isWritable()) {
1094 			return LIZARDFS_ERROR_CHUNKLOST;
1095 		}
1096 		ChunkCopiesCalculator calculator(oc->getGoal());
1097 		for (auto &part : oc->parts) {
1098 			calculator.addPart(part.type, MediaLabel::kWildcard);
1099 		}
1100 		calculator.evalRedundancyLevel();
1101 		if (!calculator.isSafeEnoughToWrite(gRedundancyLevel)) {
1102 			return LIZARDFS_ERROR_NOCHUNKSERVERS;
1103 		}
1104 
1105 		if (oc->fileCount() == 1) { // refcount==1
1106 			*nchunkid = ochunkid;
1107 			c = oc;
1108 			if (c->operation != Chunk::NONE) {
1109 				return LIZARDFS_ERROR_CHUNKBUSY;
1110 			}
1111 			if (c->needverincrease) {
1112 				assert(c->isWritable());
1113 				for (auto &part : c->parts) {
1114 					if (part.is_valid()) {
1115 						if (!part.is_busy()) {
1116 							part.mark_busy();
1117 						}
1118 						part.version = c->version+1;
1119 						matocsserv_send_setchunkversion(part.server(), ochunkid, c->version+1, c->version,
1120 								part.type);
1121 					}
1122 				}
1123 				c->interrupted = 0;
1124 				c->operation = Chunk::SET_VERSION;
1125 				c->version++;
1126 				*opflag=1;
1127 			} else {
1128 				*opflag=0;
1129 			}
1130 		} else {
1131 			if (oc->fileCount() == 0) { // it's serious structure error
1132 				lzfs_pretty_syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016" PRIX64 ")",ochunkid);
1133 				return LIZARDFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE
1134 			}
1135 			if (quota_exceeded) {
1136 				return LIZARDFS_ERROR_QUOTA;
1137 			}
1138 			assert(oc->isWritable());
1139 			c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1140 			c->interrupted = 0;
1141 			c->operation = Chunk::DUPLICATE;
1142 			chunk_delete_file_int(oc,goal);
1143 			chunk_add_file_int(c,goal);
1144 			for (const auto &old_part : oc->parts) {
1145 				if (old_part.is_valid()) {
1146 					c->parts.push_back(ChunkPart(old_part.csid, ChunkPart::BUSY, c->version, old_part.type));
1147 					matocsserv_send_duplicatechunk(old_part.server(), c->chunkid, c->version, old_part.type,
1148 							oc->chunkid, oc->version);
1149 				}
1150 			}
1151 			c->updateStats();
1152 			*nchunkid = c->chunkid;
1153 			*opflag=1;
1154 		}
1155 	}
1156 
1157 	c->lockedto = eventloop_time() + LOCKTIMEOUT;
1158 	if (*lockid == 0) {
1159 		if (usedummylockid) {
1160 			*lockid = 1;
1161 		} else {
1162 			*lockid = 2 + rnd_ranged<uint32_t>(0xFFFFFFF0); // some random number greater than 1
1163 		}
1164 	}
1165 	c->lockid = *lockid;
1166 	chunk_update_checksum(c);
1167 	return LIZARDFS_STATUS_OK;
1168 }
1169 
chunk_multi_truncate(uint64_t ochunkid,uint32_t lockid,uint32_t length,uint8_t goal,bool denyTruncatingParityParts,bool quota_exceeded,uint64_t * nchunkid)1170 uint8_t chunk_multi_truncate(uint64_t ochunkid, uint32_t lockid, uint32_t length,
1171 		uint8_t goal, bool denyTruncatingParityParts, bool quota_exceeded, uint64_t *nchunkid) {
1172 	Chunk *oc, *c;
1173 
1174 	c=NULL;
1175 	oc = chunk_find(ochunkid);
1176 	if (oc==NULL) {
1177 		return LIZARDFS_ERROR_NOCHUNK;
1178 	}
1179 	if (!oc->isWritable()) {
1180 		return LIZARDFS_ERROR_CHUNKLOST;
1181 	}
1182 	if (oc->isLocked() && (lockid == 0 || lockid != oc->lockid)) {
1183 		return LIZARDFS_ERROR_LOCKED;
1184 	}
1185 	if (denyTruncatingParityParts) {
1186 		for (const auto &part : oc->parts) {
1187 			if (slice_traits::isParityPart(part.type)) {
1188 				return LIZARDFS_ERROR_NOTPOSSIBLE;
1189 			}
1190 		}
1191 	}
1192 	if (oc->fileCount() == 1) { // refcount==1
1193 		*nchunkid = ochunkid;
1194 		c = oc;
1195 		if (c->operation != Chunk::NONE) {
1196 			return LIZARDFS_ERROR_CHUNKBUSY;
1197 		}
1198 		assert(c->isWritable());
1199 		for (auto &part : c->parts) {
1200 			if (part.is_valid()) {
1201 				if (!part.is_busy()) {
1202 					part.mark_busy();
1203 				}
1204 				part.version = c->version+1;
1205 				uint32_t chunkTypeLength =
1206 						slice_traits::chunkLengthToChunkPartLength(part.type, length);
1207 				matocsserv_send_truncatechunk(part.server(), ochunkid, part.type, chunkTypeLength,
1208 						c->version + 1, c->version);
1209 			}
1210 		}
1211 		c->interrupted = 0;
1212 		c->operation = Chunk::TRUNCATE;
1213 		c->version++;
1214 	} else {
1215 		if (oc->fileCount() == 0) { // it's serious structure error
1216 			lzfs_pretty_syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016" PRIX64 ")",ochunkid);
1217 			return LIZARDFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE
1218 		}
1219 		if (quota_exceeded) {
1220 			return LIZARDFS_ERROR_QUOTA;
1221 		}
1222 
1223 		assert(oc->isWritable());
1224 		c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1225 		c->interrupted = 0;
1226 		c->operation = Chunk::DUPTRUNC;
1227 		chunk_delete_file_int(oc,goal);
1228 		chunk_add_file_int(c,goal);
1229 		for (const auto &old_part : oc->parts) {
1230 			if (old_part.is_valid()) {
1231 				c->parts.push_back(ChunkPart(old_part.csid, ChunkPart::BUSY, c->version, old_part.type));
1232 				matocsserv_send_duptruncchunk(old_part.server(), c->chunkid, c->version,
1233 						old_part.type, oc->chunkid, oc->version,
1234 						slice_traits::chunkLengthToChunkPartLength(old_part.type, length));
1235 			}
1236 		}
1237 		c->updateStats();
1238 		*nchunkid = c->chunkid;
1239 	}
1240 
1241 	c->lockedto=(uint32_t)eventloop_time()+LOCKTIMEOUT;
1242 	c->lockid = lockid;
1243 	chunk_update_checksum(c);
1244 	return LIZARDFS_STATUS_OK;
1245 }
1246 #endif // ! METARESTORE
1247 
chunk_apply_modification(uint32_t ts,uint64_t oldChunkId,uint32_t lockid,uint8_t goal,bool doIncreaseVersion,uint64_t * newChunkId)1248 uint8_t chunk_apply_modification(uint32_t ts, uint64_t oldChunkId, uint32_t lockid, uint8_t goal,
1249 		bool doIncreaseVersion, uint64_t *newChunkId) {
1250 	Chunk *c;
1251 	if (oldChunkId == 0) { // new chunk
1252 		c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1253 		chunk_add_file_int(c, goal);
1254 	} else {
1255 		Chunk *oc = chunk_find(oldChunkId);
1256 		if (oc == NULL) {
1257 			return LIZARDFS_ERROR_NOCHUNK;
1258 		}
1259 		if (oc->fileCount() == 0) { // refcount == 0
1260 			lzfs_pretty_syslog(LOG_WARNING,
1261 					"serious structure inconsistency: (chunkid:%016" PRIX64 ")", oldChunkId);
1262 			return LIZARDFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE
1263 		} else if (oc->fileCount() == 1) { // refcount == 1
1264 			c = oc;
1265 			if (doIncreaseVersion) {
1266 				c->version++;
1267 			}
1268 		} else {
1269 			c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1270 			chunk_delete_file_int(oc, goal);
1271 			chunk_add_file_int(c, goal);
1272 		}
1273 	}
1274 	c->lockedto = ts + LOCKTIMEOUT;
1275 	c->lockid = lockid;
1276 	chunk_update_checksum(c);
1277 	*newChunkId = c->chunkid;
1278 	return LIZARDFS_STATUS_OK;
1279 }
1280 
1281 #ifndef METARESTORE
chunk_repair(uint8_t goal,uint64_t ochunkid,uint32_t * nversion,uint8_t correct_only)1282 int chunk_repair(uint8_t goal, uint64_t ochunkid, uint32_t *nversion, uint8_t correct_only) {
1283 	uint32_t best_version;
1284 	Chunk *c;
1285 
1286 	*nversion=0;
1287 	if (ochunkid==0) {
1288 		return 0; // not changed
1289 	}
1290 
1291 	c = chunk_find(ochunkid);
1292 	if (c==NULL) { // no such chunk - erase (nchunkid already is 0 - so just return with "changed" status)
1293 		if (correct_only == 1) { // don't erase if correct only flag is set
1294 			return 0;
1295 		} else {
1296 			return 1;
1297 		}
1298 	}
1299 	if (c->isLocked()) { // can't repair locked chunks - but if it's locked, then likely it doesn't need to be repaired
1300 		return 0;
1301 	}
1302 
1303 	// calculators will be sorted by decreasing keys, so highest version will be first.
1304 	std::map<uint32_t, ChunkCopiesCalculator, std::greater<uint32_t>> calculators;
1305 	best_version = 0;
1306 	for (const auto &part : c->parts) {
1307 		// ignore chunks which are being deleted
1308 		if (part.state != ChunkPart::DEL) {
1309 			ChunkCopiesCalculator &calculator = calculators[part.version];
1310 			calculator.addPart(part.type, matocsserv_get_label(part.server()));
1311 		}
1312 	}
1313 	// find best version which can be recovered
1314 	// calculators are sorted by decreasing keys, so highest version will be first.
1315 	for (auto &version_and_calculator : calculators) {
1316 		uint32_t version = version_and_calculator.first;
1317 		ChunkCopiesCalculator &calculator = version_and_calculator.second;
1318 		calculator.optimize();
1319 		// calculator.isRecoveryPossible() won't work below, because target goal is empty.
1320 		if (calculator.getFullCopiesCount() > 0) {
1321 			best_version = version;
1322 			break;
1323 		}
1324 	}
1325 	// current version is readable
1326 	if (best_version == c->version) {
1327 		return 0;
1328 	}
1329 	// didn't find sensible chunk
1330 	if (best_version == 0) {
1331 		if (correct_only == 1) { // don't erase if correct only flag is set
1332 			return 0;
1333 		} else {                  // otherwise erase it
1334 			chunk_delete_file_int(c, goal);
1335 			return 1;
1336 		}
1337 	}
1338 	// found previous version which is readable
1339 	c->version = best_version;
1340 	for (auto &part : c->parts) {
1341 		if (part.state == ChunkPart::INVALID && part.version==best_version) {
1342 			part.state = ChunkPart::VALID;
1343 		}
1344 	}
1345 	*nversion = best_version;
1346 	c->needverincrease=1;
1347 	c->updateStats();
1348 	chunk_update_checksum(c);
1349 	return 1;
1350 }
1351 #endif
1352 
chunk_set_version(uint64_t chunkid,uint32_t version)1353 int chunk_set_version(uint64_t chunkid,uint32_t version) {
1354 	Chunk *c;
1355 	c = chunk_find(chunkid);
1356 	if (c==NULL) {
1357 		return LIZARDFS_ERROR_NOCHUNK;
1358 	}
1359 	c->version = version;
1360 	chunk_update_checksum(c);
1361 	return LIZARDFS_STATUS_OK;
1362 }
1363 
chunk_increase_version(uint64_t chunkid)1364 int chunk_increase_version(uint64_t chunkid) {
1365 	Chunk *c;
1366 	c = chunk_find(chunkid);
1367 	if (c==NULL) {
1368 		return LIZARDFS_ERROR_NOCHUNK;
1369 	}
1370 	c->version++;
1371 	chunk_update_checksum(c);
1372 	return LIZARDFS_STATUS_OK;
1373 }
1374 
chunk_set_next_chunkid(uint64_t nextChunkIdToBeSet)1375 uint8_t chunk_set_next_chunkid(uint64_t nextChunkIdToBeSet) {
1376 	if (nextChunkIdToBeSet >= gChunksMetadata->nextchunkid) {
1377 		gChunksMetadata->nextchunkid = nextChunkIdToBeSet;
1378 		return LIZARDFS_STATUS_OK;
1379 	} else {
1380 		lzfs_pretty_syslog(LOG_WARNING,"was asked to increase the next chunk id to %" PRIu64 ", but it was"
1381 				"already set to a bigger value %" PRIu64 ". Ignoring.",
1382 				nextChunkIdToBeSet, gChunksMetadata->nextchunkid);
1383 		return LIZARDFS_ERROR_MISMATCH;
1384 	}
1385 }
1386 
1387 #ifndef METARESTORE
1388 
chunk_get_replication_state()1389 const ChunksReplicationState& chunk_get_replication_state() {
1390 	return Chunk::allChunksReplicationState;
1391 }
1392 
chunk_get_availability_state()1393 const ChunksAvailabilityState& chunk_get_availability_state() {
1394 	return Chunk::allChunksAvailability;
1395 }
1396 
1397 struct ChunkLocation {
ChunkLocationChunkLocation1398 	ChunkLocation() : chunkType(slice_traits::standard::ChunkPartType()),
1399 			chunkserver_version(0), distance(0), random(0) {
1400 	}
1401 	NetworkAddress address;
1402 	ChunkPartType chunkType;
1403 	uint32_t chunkserver_version;
1404 	uint32_t distance;
1405 	uint32_t random;
1406 	MediaLabel label;
operator <ChunkLocation1407 	bool operator<(const ChunkLocation& other) const {
1408 		if (distance < other.distance) {
1409 			return true;
1410 		} else if (distance > other.distance) {
1411 			return false;
1412 		} else {
1413 			return random < other.random;
1414 		}
1415 	}
1416 };
1417 
1418 // TODO deduplicate
chunk_getversionandlocations(uint64_t chunkid,uint32_t currentIp,uint32_t & version,uint32_t maxNumberOfChunkCopies,std::vector<ChunkTypeWithAddress> & serversList)1419 int chunk_getversionandlocations(uint64_t chunkid, uint32_t currentIp, uint32_t& version,
1420 		uint32_t maxNumberOfChunkCopies, std::vector<ChunkTypeWithAddress>& serversList) {
1421 	Chunk *c;
1422 	uint8_t cnt;
1423 
1424 	sassert(serversList.empty());
1425 	c = chunk_find(chunkid);
1426 
1427 	if (c == NULL) {
1428 		return LIZARDFS_ERROR_NOCHUNK;
1429 	}
1430 	version = c->version;
1431 	cnt = 0;
1432 	std::vector<ChunkLocation> chunkLocation;
1433 	ChunkLocation chunkserverLocation;
1434 	for (const auto &part : c->parts) {
1435 		if (part.is_valid()) {
1436 			if (cnt < maxNumberOfChunkCopies && matocsserv_getlocation(part.server(),
1437 					&(chunkserverLocation.address.ip),
1438 					&(chunkserverLocation.address.port),
1439 					&(chunkserverLocation.label)) == 0) {
1440 				chunkserverLocation.chunkType = part.type;
1441 				chunkserverLocation.chunkserver_version = matocsserv_get_version(part.server());
1442 				chunkserverLocation.distance =
1443 						topology_distance(chunkserverLocation.address.ip, currentIp);
1444 						// in the future prepare more sophisticated distance function
1445 				chunkserverLocation.random = rnd<uint32_t>();
1446 				chunkLocation.push_back(chunkserverLocation);
1447 				cnt++;
1448 			}
1449 		}
1450 	}
1451 	std::sort(chunkLocation.begin(), chunkLocation.end());
1452 	for (uint32_t i = 0; i < chunkLocation.size(); ++i) {
1453 		const ChunkLocation& loc = chunkLocation[i];
1454 		serversList.emplace_back(loc.address, loc.chunkType, loc.chunkserver_version);
1455 	}
1456 	return LIZARDFS_STATUS_OK;
1457 }
1458 
chunk_getversionandlocations(uint64_t chunkid,uint32_t currentIp,uint32_t & version,uint32_t maxNumberOfChunkCopies,std::vector<ChunkPartWithAddressAndLabel> & serversList)1459 int chunk_getversionandlocations(uint64_t chunkid, uint32_t currentIp, uint32_t& version,
1460 		uint32_t maxNumberOfChunkCopies, std::vector<ChunkPartWithAddressAndLabel>& serversList) {
1461 	Chunk *c;
1462 	uint8_t cnt;
1463 
1464 	sassert(serversList.empty());
1465 	c = chunk_find(chunkid);
1466 
1467 	if (c == NULL) {
1468 		return LIZARDFS_ERROR_NOCHUNK;
1469 	}
1470 	version = c->version;
1471 	cnt = 0;
1472 	std::vector<ChunkLocation> chunkLocation;
1473 	ChunkLocation chunkserverLocation;
1474 	for (const auto &part : c->parts) {
1475 		if (part.is_valid()) {
1476 			if (cnt < maxNumberOfChunkCopies && matocsserv_getlocation(part.server(),
1477 					&(chunkserverLocation.address.ip),
1478 					&(chunkserverLocation.address.port),
1479 					&(chunkserverLocation.label)) == 0) {
1480 				chunkserverLocation.chunkType = part.type;
1481 				chunkserverLocation.distance =
1482 						topology_distance(chunkserverLocation.address.ip, currentIp);
1483 						// in the future prepare more sophisticated distance function
1484 				chunkserverLocation.random = rnd<uint32_t>();
1485 				chunkLocation.push_back(chunkserverLocation);
1486 				cnt++;
1487 			}
1488 		}
1489 	}
1490 	std::sort(chunkLocation.begin(), chunkLocation.end());
1491 	for (uint32_t i = 0; i < chunkLocation.size(); ++i) {
1492 		const ChunkLocation& loc = chunkLocation[i];
1493 		serversList.emplace_back(loc.address, static_cast<std::string>(loc.label), loc.chunkType);
1494 	}
1495 	return LIZARDFS_STATUS_OK;
1496 }
1497 
chunk_server_has_chunk(matocsserventry * ptr,uint64_t chunkid,uint32_t version,ChunkPartType chunkType)1498 void chunk_server_has_chunk(matocsserventry *ptr, uint64_t chunkid, uint32_t version, ChunkPartType chunkType) {
1499 	Chunk *c;
1500 	const uint32_t new_version = version & 0x7FFFFFFF;
1501 	const bool todel = version & 0x80000000;
1502 	c = chunk_find(chunkid);
1503 	if (c==NULL) {
1504 		// chunkserver has nonexistent chunk, so create it for future deletion
1505 		if (chunkid>=gChunksMetadata->nextchunkid) {
1506 			fs_set_nextchunkid(FsContext::getForMaster(eventloop_time()), chunkid + 1);
1507 		}
1508 		c = chunk_new(chunkid, new_version);
1509 		c->lockedto = (uint32_t)eventloop_time()+UNUSED_DELETE_TIMEOUT;
1510 		c->lockid = 0;
1511 		chunk_update_checksum(c);
1512 	}
1513 	auto server_csid = matocsserv_get_csdb(ptr)->csid;
1514 	for (auto &part : c->parts) {
1515 		if (part.csid == server_csid && part.type == chunkType) {
1516 			// This server already notified us about its copy.
1517 			// We normally don't get repeated notifications about the same copy, but
1518 			// they can arrive after chunkserver configuration reload (particularly,
1519 			// when folders change their 'to delete' status) or due to bugs.
1520 			// Let's try to handle them as well as we can.
1521 			switch (part.state) {
1522 			case ChunkPart::DEL:
1523 				// We requested deletion, but the chunkserver 'has' this copy again.
1524 				// Repeat deletion request.
1525 				c->invalidateCopy(part);
1526 				// fallthrough
1527 			case ChunkPart::INVALID:
1528 				// leave this copy alone
1529 				return;
1530 			default:
1531 				break;
1532 			}
1533 			if (part.version != new_version) {
1534 				lzfs_pretty_syslog(LOG_WARNING, "chunk %016" PRIX64 ": master data indicated "
1535 						"version %08" PRIX32 ", chunkserver reports %08"
1536 						PRIX32 "!!! Updating master data.", c->chunkid,
1537 						part.version, new_version);
1538 				part.version = new_version;
1539 			}
1540 			if (part.version != c->version) {
1541 				c->markCopyAsHavingWrongVersion(part);
1542 				return;
1543 			}
1544 			if (!part.is_todel() && todel) {
1545 				part.mark_todel();
1546 				c->updateStats();
1547 			}
1548 			if (part.is_todel() && !todel) {
1549 				part.unmark_todel();
1550 				c->updateStats();
1551 			}
1552 			return;
1553 		}
1554 	}
1555 	const uint8_t state = (new_version == c->version) ? (todel ? ChunkPart::TDVALID : ChunkPart::VALID) : ChunkPart::INVALID;
1556 	c->parts.push_back(ChunkPart(server_csid, state, new_version, chunkType));
1557 	c->updateStats();
1558 }
1559 
chunk_damaged(matocsserventry * ptr,uint64_t chunkid,ChunkPartType chunk_type)1560 void chunk_damaged(matocsserventry *ptr, uint64_t chunkid, ChunkPartType chunk_type) {
1561 	Chunk *c;
1562 	c = chunk_find(chunkid);
1563 	if (c == NULL) {
1564 		// syslog(LOG_WARNING,"chunkserver has nonexistent chunk (%016" PRIX64 "), so create it for future deletion",chunkid);
1565 		if (chunkid >= gChunksMetadata->nextchunkid) {
1566 			gChunksMetadata->nextchunkid = chunkid + 1;
1567 		}
1568 		c = chunk_new(chunkid, 0);
1569 	}
1570 	auto server_csid = matocsserv_get_csdb(ptr)->csid;
1571 	for (auto &part : c->parts) {
1572 		if (part.csid == server_csid && part.type == chunk_type) {
1573 			c->invalidateCopy(part);
1574 			c->needverincrease=1;
1575 			return;
1576 		}
1577 	}
1578 	c->parts.push_back(ChunkPart(server_csid, ChunkPart::INVALID, 0, slice_traits::standard::ChunkPartType()));
1579 	c->updateStats();
1580 	c->needverincrease=1;
1581 }
1582 
chunk_lost(matocsserventry * ptr,uint64_t chunkid,ChunkPartType chunk_type)1583 void chunk_lost(matocsserventry *ptr,uint64_t chunkid, ChunkPartType chunk_type) {
1584 	Chunk *c = chunk_find(chunkid);
1585 	if (c == nullptr) {
1586 		return;
1587 	}
1588 	auto server_csid = matocsserv_get_csdb(ptr)->csid;
1589 	auto it = std::remove_if(c->parts.begin(), c->parts.end(), [server_csid, chunk_type](const ChunkPart &part) {
1590 		return part.csid == server_csid && part.type == chunk_type;
1591 	});
1592 	if (it != c->parts.end()) {
1593 		c->parts.erase(it, c->parts.end());
1594 		c->updateStats();
1595 		c->needverincrease = 1;
1596 	}
1597 }
1598 
chunk_server_disconnected(matocsserventry *,const MediaLabel & label)1599 void chunk_server_disconnected(matocsserventry */*ptr*/, const MediaLabel &label) {
1600 	replicationDelayInfoForAll.serverDisconnected();
1601 	if (label != MediaLabel::kWildcard) {
1602 		replicationDelayInfoForLabel[label].serverDisconnected();
1603 	}
1604 	// If chunkserver disconnects, we can assure it was processed by zombie server loop
1605 	// only if the loop was executed at least twice.
1606 	gDisconnectedCounter = 2;
1607 	eventloop_make_next_poll_nonblocking();
1608 	fs_cs_disconnected();
1609 	gChunksMetadata->lastchunkid = 0;
1610 	gChunksMetadata->lastchunkptr = NULL;
1611 }
1612 
chunk_server_unlabelled_connected()1613 void chunk_server_unlabelled_connected() {
1614 	replicationDelayInfoForAll.serverConnected();
1615 }
1616 
chunk_server_label_changed(const MediaLabel & previousLabel,const MediaLabel & newLabel)1617 void chunk_server_label_changed(const MediaLabel &previousLabel, const MediaLabel &newLabel) {
1618 	/*
1619 	 * Only server with no label can be considered as newly connected
1620 	 * and it was added to replicationDelayInfoForAll earlier
1621 	 * in chunk_server_unlabelled_connected call.
1622 	 */
1623 	if (previousLabel == MediaLabel::kWildcard) {
1624 		replicationDelayInfoForLabel[newLabel].serverConnected();
1625 	}
1626 }
1627 
1628 /*
1629  * A function that is called in every main loop iteration, that cleans chunk structs
1630  */
chunk_clean_zombie_servers_a_bit()1631 void chunk_clean_zombie_servers_a_bit() {
1632 	SignalLoopWatchdog watchdog;
1633 	static uint32_t current_position = HASHSIZE;
1634 
1635 	if (gDisconnectedCounter == 0) {
1636 		return;
1637 	}
1638 
1639 	watchdog.start();
1640 	while (current_position < HASHSIZE) {
1641 		for (; gCurrentChunkInZombieLoop; gCurrentChunkInZombieLoop = gCurrentChunkInZombieLoop->next) {
1642 			chunk_handle_disconnected_copies(gCurrentChunkInZombieLoop);
1643 			if (watchdog.expired()) {
1644 				eventloop_make_next_poll_nonblocking();
1645 				return;
1646 			}
1647 		}
1648 		++current_position;
1649 		if (current_position < HASHSIZE) {
1650 			gCurrentChunkInZombieLoop = gChunksMetadata->chunkhash[current_position];
1651 		}
1652 	}
1653 	if (current_position >= HASHSIZE) {
1654 		--gDisconnectedCounter;
1655 		current_position = 0;
1656 		gCurrentChunkInZombieLoop = gChunksMetadata->chunkhash[0];
1657 	}
1658 	eventloop_make_next_poll_nonblocking();
1659 }
1660 
chunk_got_delete_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t)1661 void chunk_got_delete_status(matocsserventry *ptr, uint64_t chunkId, ChunkPartType chunkType, uint8_t /*status*/) {
1662 	Chunk *c = chunk_find(chunkId);
1663 	if (c==NULL) {
1664 		return ;
1665 	}
1666 	auto server_csid = matocsserv_get_csdb(ptr)->csid;
1667 	auto it = std::remove_if(c->parts.begin(), c->parts.end(), [server_csid, chunkType](const ChunkPart& part) {
1668 		if (part.csid == server_csid && part.type == chunkType) {
1669 			if (part.state != ChunkPart::DEL) {
1670 				lzfs_pretty_syslog(LOG_WARNING, "got unexpected delete status");
1671 			}
1672 			return true;
1673 		}
1674 		return false;
1675 	});
1676 	if (it != c->parts.end()) {
1677 		c->parts.erase(it, c->parts.end());
1678 		c->updateStats();
1679 	}
1680 }
1681 
chunk_got_replicate_status(matocsserventry * ptr,uint64_t chunkId,uint32_t chunkVersion,ChunkPartType chunkType,uint8_t status)1682 void chunk_got_replicate_status(matocsserventry *ptr, uint64_t chunkId, uint32_t chunkVersion,
1683 		ChunkPartType chunkType, uint8_t status) {
1684 	Chunk *c = chunk_find(chunkId);
1685 	if (c == NULL || status != 0) {
1686 		return;
1687 	}
1688 
1689 	auto server_csid = matocsserv_get_csdb(ptr)->csid;
1690 	for (auto &part : c->parts) {
1691 		if (part.type == chunkType && part.csid == server_csid) {
1692 			lzfs_pretty_syslog(LOG_WARNING,
1693 					"got replication status from server which had had that chunk before (chunk:%016"
1694 					PRIX64 "_%08" PRIX32 ")", chunkId, chunkVersion);
1695 			if (part.state == ChunkPart::VALID && chunkVersion != c->version) {
1696 				part.version = chunkVersion;
1697 				c->markCopyAsHavingWrongVersion(part);
1698 			}
1699 			return;
1700 		}
1701 	}
1702 	const uint8_t state = (c->isLocked() || chunkVersion != c->version) ? ChunkPart::INVALID : ChunkPart::VALID;
1703 	c->parts.push_back(ChunkPart(server_csid, state, chunkVersion, chunkType));
1704 	c->updateStats();
1705 }
1706 
chunk_operation_status(Chunk * c,ChunkPartType chunkType,uint8_t status,matocsserventry * ptr)1707 void chunk_operation_status(Chunk *c, ChunkPartType chunkType, uint8_t status,matocsserventry *ptr) {
1708 	bool any_copy_busy = false;
1709 	auto server_csid = matocsserv_get_csdb(ptr)->csid;
1710 	for (auto &part : c->parts) {
1711 		if (part.csid == server_csid && part.type == chunkType) {
1712 			if (status!=0) {
1713 				c->interrupted = 1; // increase version after finish, just in case
1714 				c->invalidateCopy(part);
1715 			} else {
1716 				if (part.is_busy()) {
1717 					part.unmark_busy();
1718 				}
1719 			}
1720 		}
1721 		any_copy_busy |= part.is_busy();
1722 	}
1723 	if (!any_copy_busy) {
1724 		if (c->isWritable()) {
1725 			if (c->interrupted) {
1726 				chunk_emergency_increase_version(c);
1727 			} else {
1728 				matoclserv_chunk_status(c->chunkid,LIZARDFS_STATUS_OK);
1729 				c->operation = Chunk::NONE;
1730 				c->needverincrease = 0;
1731 			}
1732 		} else {
1733 			matoclserv_chunk_status(c->chunkid,LIZARDFS_ERROR_NOTDONE);
1734 			c->operation = Chunk::NONE;
1735 		}
1736 	}
1737 }
1738 
chunk_got_create_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t status)1739 void chunk_got_create_status(matocsserventry *ptr,uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
1740 	Chunk *c;
1741 	c = chunk_find(chunkId);
1742 	if (c==NULL) {
1743 		return ;
1744 	}
1745 	chunk_operation_status(c, chunkType, status, ptr);
1746 }
1747 
chunk_got_duplicate_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t status)1748 void chunk_got_duplicate_status(matocsserventry *ptr, uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
1749 	Chunk *c;
1750 	c = chunk_find(chunkId);
1751 	if (c==NULL) {
1752 		return ;
1753 	}
1754 	chunk_operation_status(c, chunkType, status, ptr);
1755 }
1756 
chunk_got_setversion_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t status)1757 void chunk_got_setversion_status(matocsserventry *ptr, uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
1758 	Chunk *c;
1759 	c = chunk_find(chunkId);
1760 	if (c==NULL) {
1761 		return ;
1762 	}
1763 	chunk_operation_status(c, chunkType, status, ptr);
1764 }
1765 
chunk_got_truncate_status(matocsserventry * ptr,uint64_t chunkid,ChunkPartType chunkType,uint8_t status)1766 void chunk_got_truncate_status(matocsserventry *ptr, uint64_t chunkid, ChunkPartType chunkType, uint8_t status) {
1767 	Chunk *c;
1768 	c = chunk_find(chunkid);
1769 	if (c==NULL) {
1770 		return ;
1771 	}
1772 	chunk_operation_status(c, chunkType, status, ptr);
1773 }
1774 
chunk_got_duptrunc_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t status)1775 void chunk_got_duptrunc_status(matocsserventry *ptr, uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
1776 	Chunk *c;
1777 	c = chunk_find(chunkId);
1778 	if (c==NULL) {
1779 		return ;
1780 	}
1781 	chunk_operation_status(c, chunkType, status, ptr);
1782 }
1783 
1784 /* ----------------------- */
1785 /* JOBS (DELETE/REPLICATE) */
1786 /* ----------------------- */
1787 
chunk_store_info(uint8_t * buff)1788 void chunk_store_info(uint8_t *buff) {
1789 	put32bit(&buff,chunksinfo_loopstart);
1790 	put32bit(&buff,chunksinfo_loopend);
1791 	put32bit(&buff,chunksinfo.done.del_invalid);
1792 	put32bit(&buff,chunksinfo.notdone.del_invalid);
1793 	put32bit(&buff,chunksinfo.done.del_unused);
1794 	put32bit(&buff,chunksinfo.notdone.del_unused);
1795 	put32bit(&buff,chunksinfo.done.del_diskclean);
1796 	put32bit(&buff,chunksinfo.notdone.del_diskclean);
1797 	put32bit(&buff,chunksinfo.done.del_overgoal);
1798 	put32bit(&buff,chunksinfo.notdone.del_overgoal);
1799 	put32bit(&buff,chunksinfo.done.copy_undergoal);
1800 	put32bit(&buff,chunksinfo.notdone.copy_undergoal);
1801 	put32bit(&buff,chunksinfo.copy_rebalance);
1802 }
1803 
1804 //jobs state: jobshpos
1805 
1806 class ChunkWorker : public coroutine {
1807 public:
1808 	ChunkWorker();
1809 	void doEveryLoopTasks();
1810 	void doEverySecondTasks();
1811 	void doChunkJobs(Chunk *c, uint16_t serverCount);
1812 	void mainLoop();
1813 
1814 private:
1815 	typedef std::vector<ServerWithUsage> ServersWithUsage;
1816 
1817 	struct MainLoopStack {
1818 		uint32_t current_bucket;
1819 		uint16_t usable_server_count;
1820 		uint32_t chunks_done_count;
1821 		uint32_t buckets_done_count;
1822 		std::size_t endangered_to_serve;
1823 		Chunk* node;
1824 		Chunk* prev;
1825 		ActiveLoopWatchdog work_limit;
1826 		ActiveLoopWatchdog watchdog;
1827 	};
1828 
1829 	bool deleteUnusedChunks();
1830 
1831 	uint32_t getMinChunkserverVersion(Chunk *c, ChunkPartType type);
1832 	bool tryReplication(Chunk *c, ChunkPartType type, matocsserventry *destinationServer);
1833 
1834 	void deleteInvalidChunkParts(Chunk *c);
1835 	void deleteAllChunkParts(Chunk *c);
1836 	bool replicateChunkPart(Chunk *c, Goal::Slice::Type slice_type, int slice_part, ChunkCopiesCalculator& calc, const IpCounter &ip_counter);
1837 	bool removeUnneededChunkPart(Chunk *c, Goal::Slice::Type slice_type, int slice_part,
1838 	                             ChunkCopiesCalculator& calc, const IpCounter &ip_counter);
1839 	bool rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator& calc, bool only_todel, const IpCounter &ip_counter);
1840 	bool rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator &calc, const IpCounter &ip_counter);
1841 
1842 	loop_info inforec_;
1843 	uint32_t deleteNotDone_;
1844 	uint32_t deleteDone_;
1845 	uint32_t prevToDeleteCount_;
1846 	uint32_t deleteLoopCount_;
1847 
1848 	/// All chunkservers sorted by disk usage.
1849 	ServersWithUsage sortedServers_;
1850 
1851 	/// For each label, all servers with this label sorted by disk usage.
1852 	std::map<MediaLabel, ServersWithUsage> labeledSortedServers_;
1853 
1854 	MainLoopStack stack_;
1855 };
1856 
ChunkWorker()1857 ChunkWorker::ChunkWorker()
1858 		: deleteNotDone_(0),
1859 		  deleteDone_(0),
1860 		  prevToDeleteCount_(0),
1861 		  deleteLoopCount_(0) {
1862 	memset(&inforec_,0,sizeof(loop_info));
1863 	stack_.current_bucket = 0;
1864 }
1865 
doEveryLoopTasks()1866 void ChunkWorker::doEveryLoopTasks() {
1867 	deleteLoopCount_++;
1868 	if (deleteLoopCount_ >= 16) {
1869 		uint32_t toDeleteCount = deleteDone_ + deleteNotDone_;
1870 		deleteLoopCount_ = 0;
1871 		if ((deleteNotDone_ > deleteDone_) && (toDeleteCount > prevToDeleteCount_)) {
1872 			TmpMaxDelFrac *= 1.5;
1873 			if (TmpMaxDelFrac>MaxDelHardLimit) {
1874 				lzfs_pretty_syslog(LOG_NOTICE,"DEL_LIMIT hard limit (%" PRIu32 " per server) reached",MaxDelHardLimit);
1875 				TmpMaxDelFrac=MaxDelHardLimit;
1876 			}
1877 			TmpMaxDel = TmpMaxDelFrac;
1878 			lzfs_pretty_syslog(LOG_NOTICE,"DEL_LIMIT temporary increased to: %" PRIu32 " per server",TmpMaxDel);
1879 		}
1880 		if ((toDeleteCount < prevToDeleteCount_) && (TmpMaxDelFrac > MaxDelSoftLimit)) {
1881 			TmpMaxDelFrac /= 1.5;
1882 			if (TmpMaxDelFrac<MaxDelSoftLimit) {
1883 				lzfs_pretty_syslog(LOG_NOTICE,"DEL_LIMIT back to soft limit (%" PRIu32 " per server)",MaxDelSoftLimit);
1884 				TmpMaxDelFrac = MaxDelSoftLimit;
1885 			}
1886 			TmpMaxDel = TmpMaxDelFrac;
1887 			lzfs_pretty_syslog(LOG_NOTICE,"DEL_LIMIT decreased back to: %" PRIu32 " per server",TmpMaxDel);
1888 		}
1889 		prevToDeleteCount_ = toDeleteCount;
1890 		deleteNotDone_ = 0;
1891 		deleteDone_ = 0;
1892 	}
1893 	chunksinfo = inforec_;
1894 	memset(&inforec_,0,sizeof(inforec_));
1895 	chunksinfo_loopstart = chunksinfo_loopend;
1896 	chunksinfo_loopend = eventloop_time();
1897 }
1898 
doEverySecondTasks()1899 void ChunkWorker::doEverySecondTasks() {
1900 	sortedServers_ = matocsserv_getservers_sorted();
1901 	labeledSortedServers_.clear();
1902 	for (const ServerWithUsage& sw : sortedServers_) {
1903 		labeledSortedServers_[sw.label].push_back(sw);
1904 	}
1905 }
1906 
chunkPresentOnServer(Chunk * c,matocsserventry * server)1907 static bool chunkPresentOnServer(Chunk *c, matocsserventry *server) {
1908 	auto server_csid = matocsserv_get_csdb(server)->csid;
1909 	return std::any_of(c->parts.begin(), c->parts.end(), [server_csid](const ChunkPart &part) {
1910 		return part.csid == server_csid;
1911 	});
1912 }
1913 
chunkPresentOnServer(Chunk * c,Goal::Slice::Type slice_type,matocsserventry * server)1914 static bool chunkPresentOnServer(Chunk *c, Goal::Slice::Type slice_type, matocsserventry *server) {
1915 	auto server_csid = matocsserv_get_csdb(server)->csid;
1916 	return std::any_of(c->parts.begin(), c->parts.end(), [server_csid, slice_type](const ChunkPart &part) {
1917 		return part.csid == server_csid && part.type.getSliceType() == slice_type;
1918 	});
1919 }
1920 
getMinChunkserverVersion(Chunk *,ChunkPartType)1921 uint32_t ChunkWorker::getMinChunkserverVersion(Chunk */*c*/, ChunkPartType /*type*/) {
1922 	return kFirstECVersion;
1923 }
1924 
tryReplication(Chunk * c,ChunkPartType part_to_recover,matocsserventry * destination_server)1925 bool ChunkWorker::tryReplication(Chunk *c, ChunkPartType part_to_recover,
1926 				matocsserventry *destination_server) {
1927 	// TODO(msulikowski) Prefer VALID over TDVALID copies.
1928 	std::vector<matocsserventry *> standard_servers;
1929 	std::vector<matocsserventry *> all_servers;
1930 	std::vector<ChunkPartType> all_parts;
1931 	ChunkCopiesCalculator calc(c->getGoal());
1932 
1933 	uint32_t destination_version = matocsserv_get_version(destination_server);
1934 
1935 	assert(destination_version >= getMinChunkserverVersion(c, part_to_recover));
1936 	for (const auto &part : c->parts) {
1937 		if (!part.is_valid() || part.is_busy() || matocsserv_replication_read_counter(part.server()) >= MaxReadRepl) {
1938 			continue;
1939 		}
1940 
1941 		if (slice_traits::isStandard(part.type)) {
1942 			standard_servers.push_back(part.server());
1943 		}
1944 
1945 		if (destination_version >= kFirstXorVersion && destination_version < kFirstECVersion
1946 			&& slice_traits::isXor(part_to_recover) && matocsserv_get_version(part.server()) < kFirstXorVersion) {
1947 			continue;
1948 		}
1949 
1950 		if (destination_version < kFirstXorVersion && !slice_traits::isStandard(part.type)) {
1951 			continue;
1952 		}
1953 
1954 		all_servers.push_back(part.server());
1955 		all_parts.push_back(part.type);
1956 		calc.addPart(part.type, matocsserv_get_label(part.server()));
1957 	}
1958 
1959 	calc.evalRedundancyLevel();
1960 	if (!calc.isRecoveryPossible()) {
1961 		return false;
1962 	}
1963 
1964 	if (destination_version >= kFirstECVersion ||
1965 	    (destination_version >= kFirstXorVersion && slice_traits::isXor(part_to_recover))) {
1966 		matocsserv_send_liz_replicatechunk(destination_server, c->chunkid, c->version,
1967 		                                   part_to_recover, all_servers,
1968 		                                   all_parts);
1969 		stats_replications++;
1970 		c->needverincrease = 1;
1971 		return true;
1972 	}
1973 
1974 	// fall back to legacy replication
1975 	assert(slice_traits::isStandard(part_to_recover));
1976 
1977 	if (standard_servers.empty()) {
1978 		return false;
1979 	}
1980 
1981 	matocsserv_send_replicatechunk(destination_server, c->chunkid, c->version,
1982 	                               standard_servers[rnd_ranged<uint32_t>(standard_servers.size())]);
1983 
1984 	stats_replications++;
1985 	c->needverincrease = 1;
1986 	return true;
1987 }
1988 
deleteInvalidChunkParts(Chunk * c)1989 void ChunkWorker::deleteInvalidChunkParts(Chunk *c) {
1990 	for (auto &part : c->parts) {
1991 		if (matocsserv_deletion_counter(part.server()) < TmpMaxDel) {
1992 			if (!part.is_valid()) {
1993 				if (part.state == ChunkPart::DEL) {
1994 					lzfs_pretty_syslog(LOG_WARNING,
1995 					       "chunk hasn't been deleted since previous loop - "
1996 					       "retry");
1997 				}
1998 				part.state = ChunkPart::DEL;
1999 				stats_deletions++;
2000 				matocsserv_send_deletechunk(part.server(), c->chunkid, 0, part.type);
2001 				inforec_.done.del_invalid++;
2002 				deleteDone_++;
2003 			}
2004 		} else {
2005 			if (part.state == ChunkPart::INVALID) {
2006 				inforec_.notdone.del_invalid++;
2007 				deleteNotDone_++;
2008 			}
2009 		}
2010 	}
2011 }
2012 
deleteAllChunkParts(Chunk * c)2013 void ChunkWorker::deleteAllChunkParts(Chunk *c) {
2014 	for (auto &part : c->parts) {
2015 		if (matocsserv_deletion_counter(part.server()) < TmpMaxDel) {
2016 			if (part.is_valid() && !part.is_busy()) {
2017 				c->deleteCopy(part);
2018 				c->needverincrease = 1;
2019 				stats_deletions++;
2020 				matocsserv_send_deletechunk(part.server(), c->chunkid, c->version,
2021 				                            part.type);
2022 				inforec_.done.del_unused++;
2023 				deleteDone_++;
2024 			}
2025 		} else {
2026 			if (part.state == ChunkPart::VALID || part.state == ChunkPart::TDVALID) {
2027 				inforec_.notdone.del_unused++;
2028 				deleteNotDone_++;
2029 			}
2030 		}
2031 	}
2032 }
2033 
replicateChunkPart(Chunk * c,Goal::Slice::Type slice_type,int slice_part,ChunkCopiesCalculator & calc,const IpCounter & ip_counter)2034 bool ChunkWorker::replicateChunkPart(Chunk *c, Goal::Slice::Type slice_type, int slice_part,
2035 					ChunkCopiesCalculator &calc, const IpCounter &ip_counter) {
2036 	std::vector<matocsserventry *> servers;
2037 	int skipped_replications = 0, valid_parts_count = 0, expected_copies = 0;
2038 	bool tried_to_replicate = false;
2039 	Goal::Slice::Labels replicate_labels;
2040 
2041 	replicate_labels = calc.getLabelsToRecover(slice_type, slice_part);
2042 
2043 	if (calc.getAvailable().find(slice_type) != calc.getAvailable().end()) {
2044 		valid_parts_count =
2045 		        Goal::Slice::countLabels(calc.getAvailable()[slice_type][slice_part]);
2046 	}
2047 
2048 	expected_copies = Goal::Slice::countLabels(calc.getTarget()[slice_type][slice_part]);
2049 
2050 	uint32_t min_chunkserver_version = getMinChunkserverVersion(c, ChunkPartType(slice_type, slice_part));
2051 
2052 	for (const auto &label_and_count : replicate_labels) {
2053 		tried_to_replicate = true;
2054 
2055 		if (jobsnorepbefore >= eventloop_time()) {
2056 			break;
2057 		}
2058 
2059 		if (label_and_count.first == MediaLabel::kWildcard) {
2060 			if (!replicationDelayInfoForAll.replicationAllowed(
2061 			            label_and_count.second)) {
2062 				continue;
2063 			}
2064 		} else if (!replicationDelayInfoForLabel[label_and_count.first].replicationAllowed(
2065 		                   label_and_count.second)) {
2066 			skipped_replications += label_and_count.second;
2067 			continue;
2068 		}
2069 
2070 		// Get a list of possible destination servers
2071 		int total_matching, returned_matching, temporarily_unavailable;
2072 		matocsserv_getservers_lessrepl(label_and_count.first, min_chunkserver_version, MaxWriteRepl,
2073 		                               ip_counter, servers, total_matching, returned_matching,
2074 		                               temporarily_unavailable);
2075 
2076 		// Find a destination server for replication -- the first one without a copy of 'c'
2077 		matocsserventry *destination = nullptr;
2078 		matocsserventry *backup_destination = nullptr;
2079 		for (const auto &server : servers) {
2080 			if (!chunkPresentOnServer(c, server)) {
2081 				destination = server;
2082 				break;
2083 			}
2084 			if (backup_destination == nullptr && !chunkPresentOnServer(c, slice_type, server)) {
2085 				backup_destination = server;
2086 			}
2087 		}
2088 
2089 		// If destination was not found, use a backup one, i.e. server where
2090 		// there is a copy of this chunk, but from different slice.
2091 		// Do it only if there are no available chunkservers in the system,
2092 		// not if they merely reached their replication limit.
2093 		if (destination == nullptr && temporarily_unavailable == 0) {
2094 			// there are no servers which are expected to be available soon,
2095 			// so backup server must be used
2096 			destination = backup_destination;
2097 		}
2098 
2099 		if (destination == nullptr) {
2100 			// there is no server suitable for replication to be written to
2101 			break;
2102 		}
2103 
2104 		if (!(label_and_count.first == MediaLabel::kWildcard ||
2105 		      matocsserv_get_label(destination) == label_and_count.first)) {
2106 			// found server doesn't match requested label
2107 			if (total_matching > returned_matching) {
2108 				// There is a server which matches the current label, but it has
2109 				// exceeded the
2110 				// replication limit. In this case we won't try to use servers with
2111 				// non-matching
2112 				// labels as our destination -- we will wait for that server to be
2113 				// ready.
2114 				skipped_replications += label_and_count.second;
2115 				continue;
2116 			}
2117 			if (!RebalancingBetweenLabels && !c->isEndangered()
2118 			    && calc.isSafeEnoughToWrite(gRedundancyLevel)) {
2119 				// Chunk is not endangered, so we should prevent label spilling.
2120 				// Only endangered chunks will be replicated across labels.
2121 				skipped_replications += label_and_count.second;
2122 				continue;
2123 			}
2124 			if (valid_parts_count + skipped_replications >= expected_copies) {
2125 				// Don't create copies on non-matching servers if there already are
2126 				// enough replicas.
2127 				continue;
2128 			}
2129 		}
2130 
2131 		if (tryReplication(c, ChunkPartType(slice_type, slice_part), destination)) {
2132 			inforec_.done.copy_undergoal++;
2133 			return true;
2134 		} else {
2135 			// There is no server suitable for replication to be read from
2136 			skipped_replications += label_and_count.second;
2137 			break;  // there's no need to analyze other labels if there's no free source
2138 			        // server
2139 		}
2140 	}
2141 	if (tried_to_replicate) {
2142 		inforec_.notdone.copy_undergoal++;
2143 		// Enqueue chunk again only if it was taken directly from endangered chunks queue
2144 		// to avoid repetitions. If it was taken from chunk hashmap, inEndangeredQueue bit
2145 		// would be still up.
2146 		if (gEndangeredChunksServingLimit > 0 && Chunk::endangeredChunks.size() < gEndangeredChunksMaxCapacity
2147 			&& !c->inEndangeredQueue && calc.getState() == ChunksAvailabilityState::kEndangered) {
2148 			c->inEndangeredQueue = 1;
2149 			Chunk::endangeredChunks.push_back(c);
2150 		}
2151 	}
2152 
2153 	return false;
2154 }
2155 
removeUnneededChunkPart(Chunk * c,Goal::Slice::Type slice_type,int slice_part,ChunkCopiesCalculator & calc,const IpCounter & ip_counter)2156 bool ChunkWorker::removeUnneededChunkPart(Chunk *c, Goal::Slice::Type slice_type, int slice_part,
2157 					ChunkCopiesCalculator &calc, const IpCounter &ip_counter) {
2158 	Goal::Slice::Labels remove_pool = calc.getRemovePool(slice_type, slice_part);
2159 	if (remove_pool.empty()) {
2160 		return false;
2161 	}
2162 
2163 	ChunkPart *candidate = nullptr;
2164 	bool candidate_todel = false;
2165 	int candidate_occurrence = 0;
2166 	double candidate_usage = std::numeric_limits<double>::lowest();
2167 
2168 	for (auto &part : c->parts) {
2169 		if (!part.is_valid() || part.type != ChunkPartType(slice_type, slice_part)) {
2170 			continue;
2171 		}
2172 		if (matocsserv_deletion_counter(part.server()) >= TmpMaxDel) {
2173 			continue;
2174 		}
2175 
2176 		MediaLabel server_label = matocsserv_get_label(part.server());
2177 		if (remove_pool.find(server_label) == remove_pool.end()) {
2178 			continue;
2179 		}
2180 
2181 		bool is_todel = part.is_todel();
2182 		double usage = matocsserv_get_usage(part.server());
2183 		int occurrence = ip_counter.empty() ? 1 : ip_counter.at(matocsserv_get_servip(part.server()));
2184 
2185 		if (std::make_tuple(is_todel, occurrence, usage) >
2186 		      std::make_tuple(candidate_todel, candidate_occurrence, candidate_usage)) {
2187 			candidate = &part;
2188 			candidate_usage = usage;
2189 			candidate_todel = is_todel;
2190 			candidate_occurrence = occurrence;
2191 		}
2192 	}
2193 
2194 	if (candidate &&
2195 	    calc.canRemovePart(slice_type, slice_part, matocsserv_get_label(candidate->server()))) {
2196 		c->deleteCopy(*candidate);
2197 		c->needverincrease = 1;
2198 		stats_deletions++;
2199 		matocsserv_send_deletechunk(candidate->server(), c->chunkid, 0, candidate->type);
2200 
2201 		int overgoal_copies = calc.countPartsToMove(slice_type, slice_part).second;
2202 
2203 		inforec_.done.del_overgoal++;
2204 		deleteDone_++;
2205 		inforec_.notdone.del_overgoal += overgoal_copies - 1;
2206 		deleteNotDone_ += overgoal_copies - 1;
2207 
2208 		return true;
2209 	}
2210 
2211 	return false;
2212 }
2213 
rebalanceChunkParts(Chunk * c,ChunkCopiesCalculator & calc,bool only_todel,const IpCounter & ip_counter)2214 bool ChunkWorker::rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator &calc, bool only_todel, const IpCounter &ip_counter) {
2215 	if(!only_todel) {
2216 		double min_usage = sortedServers_.front().disk_usage;
2217 		double max_usage = sortedServers_.back().disk_usage;
2218 		if ((max_usage - min_usage) <= gAcceptableDifference) {
2219 			return false;
2220 		}
2221 	}
2222 
2223 	// Consider each copy to be moved to a server with disk usage much less than actual.
2224 	// There are at least two servers with a disk usage difference grater than
2225 	// gAcceptableDifference, so it's worth checking.
2226 	for (const auto &part : c->parts) {
2227 		if (!part.is_valid()) {
2228 			continue;
2229 		}
2230 
2231 		if(only_todel && !part.is_todel()) {
2232 			continue;
2233 		}
2234 
2235 		auto current_ip = matocsserv_get_servip(part.server());
2236 		auto it = ip_counter.find(current_ip);
2237 		auto current_ip_count = it != ip_counter.end() ? it->second : 0;
2238 
2239 		MediaLabel current_copy_label = matocsserv_get_label(part.server());
2240 		double current_copy_disk_usage = matocsserv_get_usage(part.server());
2241 		// Look for a server that has disk usage much less than currentCopyDiskUsage.
2242 		// If such a server exists consider creating a new copy of this chunk there.
2243 		// First, choose all possible candidates for the destination server: we consider
2244 		// only
2245 		// servers with the same label is rebalancing between labels if turned off or the
2246 		// goal
2247 		// requires our copy to exist on a server labeled 'currentCopyLabel'.
2248 		bool multi_label_rebalance =
2249 		        RebalancingBetweenLabels &&
2250 		        (current_copy_label == MediaLabel::kWildcard ||
2251 		         calc.canMovePartToDifferentLabel(part.type.getSliceType(),
2252 		                                          part.type.getSlicePart(),
2253 		                                          current_copy_label));
2254 
2255 		uint32_t min_chunkserver_version = getMinChunkserverVersion(c, part.type);
2256 
2257 		const ServersWithUsage &sorted_servers =
2258 		        multi_label_rebalance ? sortedServers_
2259 		                              : labeledSortedServers_[current_copy_label];
2260 
2261 		for (const auto &empty_server : sorted_servers) {
2262 			if (!only_todel && gAvoidSameIpChunkservers) {
2263 				auto empty_server_ip = matocsserv_get_servip(empty_server.server);
2264 				auto it = ip_counter.find(empty_server_ip);
2265 				auto empty_server_ip_count = it != ip_counter.end() ? it->second : 0;
2266 				if (empty_server_ip != current_ip && empty_server_ip_count >= current_ip_count) {
2267 					continue;
2268 				}
2269 			}
2270 
2271 			if (!only_todel && empty_server.disk_usage >
2272 			    current_copy_disk_usage - gAcceptableDifference) {
2273 				break;  // No more suitable destination servers (next servers have
2274 				        // higher usage)
2275 			}
2276 			if (matocsserv_get_version(empty_server.server) < min_chunkserver_version) {
2277 				continue;
2278 			}
2279 			if (chunkPresentOnServer(c, part.type.getSliceType(), empty_server.server)) {
2280 				continue;  // A copy is already here
2281 			}
2282 			if (matocsserv_replication_write_counter(empty_server.server) >= MaxWriteRepl) {
2283 				continue;  // We can't create a new copy here
2284 			}
2285 			if (tryReplication(c, part.type, empty_server.server)) {
2286 				inforec_.copy_rebalance++;
2287 				return true;
2288 			}
2289 		}
2290 	}
2291 
2292 	return false;
2293 }
2294 
rebalanceChunkPartsWithSameIp(Chunk * c,ChunkCopiesCalculator & calc,const IpCounter & ip_counter)2295 bool ChunkWorker::rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator &calc, const IpCounter &ip_counter) {
2296 	if (!gAvoidSameIpChunkservers) {
2297 		return false;
2298 	}
2299 
2300 	for (const auto &part : c->parts) {
2301 		if (!part.is_valid()) {
2302 			continue;
2303 		}
2304 
2305 		auto current_ip = matocsserv_get_servip(part.server());
2306 		auto it = ip_counter.find(current_ip);
2307 		auto current_ip_count = it != ip_counter.end() ? it->second : 0;
2308 
2309 		MediaLabel current_copy_label = matocsserv_get_label(part.server());
2310 
2311 		bool multi_label_rebalance =
2312 		        RebalancingBetweenLabels &&
2313 		        (current_copy_label == MediaLabel::kWildcard ||
2314 		         calc.canMovePartToDifferentLabel(part.type.getSliceType(),
2315 		                                          part.type.getSlicePart(),
2316 		                                          current_copy_label));
2317 
2318 		uint32_t min_chunkserver_version = getMinChunkserverVersion(c, part.type);
2319 
2320 		const ServersWithUsage &sorted_servers =
2321 		        multi_label_rebalance ? sortedServers_
2322 		                              : labeledSortedServers_[current_copy_label];
2323 
2324 		ServersWithUsage sorted_by_ip_count;
2325 		sorted_by_ip_count.resize(sorted_servers.size());
2326 		counting_sort_copy(sorted_servers.begin(), sorted_servers.end(), sorted_by_ip_count.begin(),
2327 			           [&ip_counter](const ServerWithUsage& elem) {
2328 			                  auto ip = matocsserv_get_servip(elem.server);
2329 			                  auto it = ip_counter.find(ip);
2330 			                  return it != ip_counter.end() ? it->second : 0;
2331 			           });
2332 
2333 		for (const auto &empty_server : sorted_by_ip_count) {
2334 			auto empty_server_ip = matocsserv_get_servip(empty_server.server);
2335 			auto it = ip_counter.find(empty_server_ip);
2336 			auto empty_server_ip_count = it != ip_counter.end() ? it->second : 0;
2337 			if (empty_server_ip_count >= (current_ip_count - 1)) {
2338 				break;
2339 			}
2340 
2341 			if (matocsserv_get_version(empty_server.server) < min_chunkserver_version) {
2342 				continue;
2343 			}
2344 			if (chunkPresentOnServer(c, part.type.getSliceType(), empty_server.server)) {
2345 				continue;  // A copy is already here
2346 			}
2347 			if (matocsserv_replication_write_counter(empty_server.server) >= MaxWriteRepl) {
2348 				continue;  // We can't create a new copy here
2349 			}
2350 			if (tryReplication(c, part.type, empty_server.server)) {
2351 				inforec_.copy_rebalance++;
2352 				return true;
2353 			}
2354 		}
2355 	}
2356 
2357 	return false;
2358 }
2359 
2360 
doChunkJobs(Chunk * c,uint16_t serverCount)2361 void ChunkWorker::doChunkJobs(Chunk *c, uint16_t serverCount) {
2362 	// step 0. Update chunk's statistics
2363 	// Useful e.g. if definitions of goals did change.
2364 	chunk_handle_disconnected_copies(c);
2365 	c->updateStats();
2366 	if (serverCount == 0) {
2367 		return;
2368 	}
2369 
2370 	int invalid_parts = 0;
2371 	ChunkCopiesCalculator calc(c->getGoal());
2372 
2373 	// Chunk is in degenerate state if it has more than 1 part
2374 	// on the same chunkserver (i.e. 1 std and 1 xor)
2375 	// TODO(sarna): this flat_set should be removed after
2376 	// 'slists' are rewritten to use sensible data structures
2377 	bool degenerate = false;
2378 	flat_set<matocsserventry *, small_vector<matocsserventry *, 64>> servers;
2379 
2380 	// step 1. calculate number of valid and invalid copies
2381 	for (const auto &part : c->parts) {
2382 		if (part.is_valid()) {
2383 			calc.addPart(part.type, matocsserv_get_label(part.server()));
2384 			if (!degenerate) {
2385 				degenerate = servers.count(part.server()) > 0;
2386 				servers.insert(part.server());
2387 			}
2388 		} else {
2389 			++invalid_parts;
2390 		}
2391 	}
2392 	calc.optimize();
2393 
2394 	// step 1a. count number of chunk parts on servers with the same ip
2395 	IpCounter ip_occurrence;
2396 	if (gAvoidSameIpChunkservers) {
2397 		for (auto &part : c->parts) {
2398 			if (part.is_valid()) {
2399 				++ip_occurrence[matocsserv_get_servip(part.server())];
2400 			}
2401 		}
2402 	}
2403 
2404 	// step 2. check number of copies
2405 	if (c->isLost() && invalid_parts > 0 && c->fileCount() > 0) {
2406 		lzfs_pretty_syslog(LOG_WARNING, "chunk %016lx has not enough valid parts (%d)"
2407 		                   " consider repairing it manually", c->chunkid, invalid_parts);
2408 		for (const auto &part : c->parts) {
2409 			if (!part.is_valid()) {
2410 				lzfs_pretty_syslog(LOG_NOTICE, "chunk %016lx_%08x - invalid part on (%s - ver:%08x)",
2411 				c->chunkid, c->version, matocsserv_getstrip(part.server()), part.version);
2412 			}
2413 		}
2414 		return;
2415 	}
2416 
2417 	// step 3. delete invalid parts
2418 	deleteInvalidChunkParts(c);
2419 
2420 	// step 4. return if chunk is during some operation
2421 	if (c->operation != Chunk::NONE || (c->isLocked())) {
2422 		return;
2423 	}
2424 
2425 	// step 5. check busy count
2426 	for (const auto &part : c->parts) {
2427 		if (part.is_busy()) {
2428 			lzfs_pretty_syslog(LOG_WARNING, "chunk %016" PRIX64 " has unexpected BUSY copies",
2429 			       c->chunkid);
2430 			return;
2431 		}
2432 	}
2433 
2434 	// step 6. delete unused chunk
2435 	if (c->fileCount() == 0) {
2436 		deleteAllChunkParts(c);
2437 		return;
2438 	}
2439 
2440 	if (c->isLost()) {
2441 		return;
2442 	}
2443 
2444 	// step 7. check if chunk needs any replication
2445 	for (const auto &slice : calc.getTarget()) {
2446 		for (int i = 0; i < slice.size(); ++i) {
2447 			if (replicateChunkPart(c, slice.getType(), i, calc, ip_occurrence)) {
2448 				return;
2449 			}
2450 		}
2451 	}
2452 
2453 	// Do not remove any parts if more than 1 part resides on 1 chunkserver
2454 	if (degenerate && calc.countPartsToRecover() > 0) {
2455 		return;
2456 	}
2457 
2458 	// step 8. if chunk has too many copies then delete some of them
2459 	for (const auto &slice : calc.getAvailable()) {
2460 		for (int i = 0; i < slice.size(); ++i) {
2461 			std::pair<int, int> operations = calc.countPartsToMove(slice.getType(), i);
2462 			if (operations.first > 0 || operations.second == 0) {
2463 				// do not remove elements if some are missing
2464 				continue;
2465 			}
2466 
2467 			if (removeUnneededChunkPart(c, slice.getType(), i, calc, ip_occurrence)) {
2468 				return;
2469 			}
2470 		}
2471 	}
2472 
2473 	// step 9. If chunk has parts marked as "to delete" then move them to other servers
2474 	if(rebalanceChunkParts(c, calc, true, ip_occurrence)) {
2475 		return;
2476 	}
2477 
2478 	if (chunksinfo.notdone.copy_undergoal > 0 && chunksinfo.done.copy_undergoal > 0) {
2479 		return;
2480 	}
2481 
2482 	// step 10. Move chunk parts residing on chunkservers with the same ip.
2483 	if (rebalanceChunkPartsWithSameIp(c, calc, ip_occurrence)) {
2484 		return;
2485 	}
2486 
2487 	// step 11. if there is too big difference between chunkservers then make copy of chunk from
2488 	// a server with a high disk usage on a server with low disk usage
2489 	if (rebalanceChunkParts(c, calc, false, ip_occurrence)) {
2490 		return;
2491 	}
2492 
2493 }
2494 
deleteUnusedChunks()2495 bool ChunkWorker::deleteUnusedChunks() {
2496 	while (stack_.node != nullptr) {
2497 		chunk_handle_disconnected_copies(stack_.node);
2498 		if (stack_.node->fileCount() == 0 && stack_.node->parts.empty()) {
2499 			// If current chunk in zombie loop is to be deleted, it must be updated
2500 			// to the next chunk
2501 			if (stack_.node == gCurrentChunkInZombieLoop) {
2502 				gCurrentChunkInZombieLoop = gCurrentChunkInZombieLoop->next;
2503 			}
2504 			// Something could be inserted between prev and node (when we yielded)
2505 			// so we need to make prev valid.
2506 			while (stack_.prev && stack_.prev->next != stack_.node) {
2507 				stack_.prev = stack_.prev->next;
2508 			}
2509 
2510 			assert((!stack_.prev && gChunksMetadata->chunkhash[stack_.current_bucket] == stack_.node) ||
2511 			       (stack_.prev && stack_.prev->next == stack_.node));
2512 
2513 			if (stack_.prev) {
2514 				stack_.prev->next = stack_.node->next;
2515 			} else {
2516 				gChunksMetadata->chunkhash[stack_.current_bucket] =
2517 				        stack_.node->next;
2518 			}
2519 
2520 			Chunk *tmp = stack_.node->next;
2521 			chunk_delete(stack_.node);
2522 			stack_.node = tmp;
2523 		} else {
2524 			stack_.prev = stack_.node;
2525 			stack_.node = stack_.node->next;
2526 		}
2527 
2528 		if (stack_.watchdog.expired()) {
2529 			return false;
2530 		}
2531 	}
2532 
2533 	return true;
2534 }
2535 
mainLoop()2536 void ChunkWorker::mainLoop() {
2537 	Chunk *c;
2538 
2539 	reenter(this) {
2540 		stack_.work_limit.setMaxDuration(std::chrono::milliseconds(ChunksLoopTimeout));
2541 		stack_.work_limit.start();
2542 		stack_.watchdog.start();
2543 		stack_.chunks_done_count = 0;
2544 		stack_.buckets_done_count = 0;
2545 
2546 		if (starttime + gOperationsDelayInit > eventloop_time()) {
2547 			return;
2548 		}
2549 
2550 		double min_usage, max_usage;
2551 		matocsserv_usagedifference(&min_usage, &max_usage, &stack_.usable_server_count,
2552 		                           nullptr);
2553 
2554 		if (min_usage > max_usage) {
2555 			return;
2556 		}
2557 
2558 		doEverySecondTasks();
2559 
2560 		if (jobsnorepbefore < eventloop_time()) {
2561 			stack_.endangered_to_serve = gEndangeredChunksServingLimit;
2562 			while (stack_.endangered_to_serve > 0 && !Chunk::endangeredChunks.empty()) {
2563 				c = Chunk::endangeredChunks.front();
2564 				Chunk::endangeredChunks.pop_front();
2565 				// If queued chunk is obsolete (e.g. was freed while in queue),
2566 				// do not proceed with chunk jobs.
2567 				if (c->inEndangeredQueue == 1) {
2568 					c->inEndangeredQueue = 0;
2569 					doChunkJobs(c, stack_.usable_server_count);
2570 				}
2571 				--stack_.endangered_to_serve;
2572 
2573 				if (stack_.watchdog.expired()) {
2574 					yield;
2575 					stack_.watchdog.start();
2576 				}
2577 			}
2578 		}
2579 
2580 		while (stack_.buckets_done_count < HashSteps &&
2581 		       stack_.chunks_done_count < HashCPS) {
2582 			if (stack_.current_bucket == 0) {
2583 				doEveryLoopTasks();
2584 			}
2585 
2586 			if (stack_.watchdog.expired()) {
2587 				yield;
2588 				stack_.watchdog.start();
2589 			}
2590 
2591 			// delete unused chunks
2592 			stack_.prev = nullptr;
2593 			stack_.node = gChunksMetadata->chunkhash[stack_.current_bucket];
2594 			while (!deleteUnusedChunks()) {
2595 				yield;
2596 				stack_.watchdog.start();
2597 			}
2598 
2599 			// regenerate usable_server_count
2600 			matocsserv_usagedifference(nullptr, nullptr, &stack_.usable_server_count,
2601 			                           nullptr);
2602 
2603 			stack_.node = gChunksMetadata->chunkhash[stack_.current_bucket];
2604 			while (stack_.node) {
2605 				doChunkJobs(stack_.node, stack_.usable_server_count);
2606 				++stack_.chunks_done_count;
2607 				stack_.node = stack_.node->next;
2608 
2609 				if (stack_.watchdog.expired()) {
2610 					yield;
2611 					stack_.watchdog.start();
2612 					matocsserv_usagedifference(nullptr, nullptr,
2613 					                           &stack_.usable_server_count,
2614 					                           nullptr);
2615 				}
2616 			}
2617 
2618 			stack_.current_bucket +=
2619 			        123;  // if HASHSIZE is any power of 2 then any odd number is
2620 			              // good here
2621 			stack_.current_bucket %= HASHSIZE;
2622 			++stack_.buckets_done_count;
2623 
2624 			if (stack_.work_limit.expired()) {
2625 				break;
2626 			}
2627 		}
2628 	}
2629 }
2630 
2631 static std::unique_ptr<ChunkWorker> gChunkWorker;
2632 
chunk_jobs_main(void)2633 void chunk_jobs_main(void) {
2634 	if (gChunkWorker->is_complete()) {
2635 		gChunkWorker->reset();
2636 	}
2637 }
2638 
chunk_jobs_process_bit(void)2639 void chunk_jobs_process_bit(void) {
2640 	if (!gChunkWorker->is_complete()) {
2641 		gChunkWorker->mainLoop();
2642 		if (!gChunkWorker->is_complete()) {
2643 			eventloop_make_next_poll_nonblocking();
2644 		}
2645 	}
2646 }
2647 
2648 #endif
2649 
2650 constexpr uint32_t kSerializedChunkSizeNoLockId = 16;
2651 constexpr uint32_t kSerializedChunkSizeWithLockId = 20;
2652 #define CHUNKCNT 1000
2653 
2654 #ifdef METARESTORE
2655 
chunk_dump(void)2656 void chunk_dump(void) {
2657 	Chunk *c;
2658 	uint32_t i;
2659 
2660 	for (i=0 ; i<HASHSIZE ; i++) {
2661 		for (c=gChunksMetadata->chunkhash[i] ; c ; c=c->next) {
2662 			printf("*|i:%016" PRIX64 "|v:%08" PRIX32 "|g:%" PRIu8 "|t:%10" PRIu32 "\n",c->chunkid,c->version,c->highestIdGoal(),c->lockedto);
2663 		}
2664 	}
2665 }
2666 
2667 #endif
2668 
chunk_load(FILE * fd,bool loadLockIds)2669 int chunk_load(FILE *fd, bool loadLockIds) {
2670 	uint8_t hdr[8];
2671 	const uint8_t *ptr;
2672 	int32_t r;
2673 	Chunk *c;
2674 // chunkdata
2675 	uint64_t chunkid;
2676 
2677 	if (fread(hdr,1,8,fd)!=8) {
2678 		return -1;
2679 	}
2680 	ptr = hdr;
2681 	gChunksMetadata->nextchunkid = get64bit(&ptr);
2682 	int32_t serializedChunkSize = (loadLockIds
2683 			? kSerializedChunkSizeWithLockId : kSerializedChunkSizeNoLockId);
2684 	std::vector<uint8_t> loadbuff(serializedChunkSize);
2685 	for (;;) {
2686 		r = fread(loadbuff.data(), 1, serializedChunkSize, fd);
2687 		if (r != serializedChunkSize) {
2688 			return -1;
2689 		}
2690 		ptr = loadbuff.data();
2691 		chunkid = get64bit(&ptr);
2692 		if (chunkid>0) {
2693 			uint32_t version = get32bit(&ptr);
2694 			c = chunk_new(chunkid, version);
2695 			c->lockedto = get32bit(&ptr);
2696 			if (loadLockIds) {
2697 				c->lockid = get32bit(&ptr);
2698 			}
2699 		} else {
2700 			uint32_t version = get32bit(&ptr);
2701 			uint32_t lockedto = get32bit(&ptr);
2702 			if (version==0 && lockedto==0) {
2703 				return 0;
2704 			} else {
2705 				return -1;
2706 			}
2707 		}
2708 	}
2709 	return 0;       // unreachable
2710 }
2711 
chunk_store(FILE * fd)2712 void chunk_store(FILE *fd) {
2713 	passert(gChunksMetadata);
2714 	uint8_t hdr[8];
2715 	uint8_t storebuff[kSerializedChunkSizeWithLockId * CHUNKCNT];
2716 	uint8_t *ptr;
2717 	uint32_t i,j;
2718 	Chunk *c;
2719 // chunkdata
2720 	uint64_t chunkid;
2721 	uint32_t version;
2722 	uint32_t lockedto, lockid;
2723 	ptr = hdr;
2724 	put64bit(&ptr,gChunksMetadata->nextchunkid);
2725 	if (fwrite(hdr,1,8,fd)!=(size_t)8) {
2726 		return;
2727 	}
2728 	j=0;
2729 	ptr = storebuff;
2730 	for (i=0 ; i<HASHSIZE ; i++) {
2731 		for (c=gChunksMetadata->chunkhash[i] ; c ; c=c->next) {
2732 #ifndef METARESTORE
2733 			chunk_handle_disconnected_copies(c);
2734 #endif
2735 			chunkid = c->chunkid;
2736 			put64bit(&ptr,chunkid);
2737 			version = c->version;
2738 			put32bit(&ptr,version);
2739 			lockedto = c->lockedto;
2740 			lockid = c->lockid;
2741 			put32bit(&ptr,lockedto);
2742 			put32bit(&ptr,lockid);
2743 			j++;
2744 			if (j==CHUNKCNT) {
2745 				size_t writtenBlockSize = kSerializedChunkSizeWithLockId * CHUNKCNT;
2746 				if (fwrite(storebuff, 1, writtenBlockSize, fd) != writtenBlockSize) {
2747 					return;
2748 				}
2749 				j=0;
2750 				ptr = storebuff;
2751 			}
2752 		}
2753 	}
2754 	memset(ptr, 0, kSerializedChunkSizeWithLockId);
2755 	j++;
2756 	size_t writtenBlockSize = kSerializedChunkSizeWithLockId * j;
2757 	if (fwrite(storebuff, 1, writtenBlockSize, fd) != writtenBlockSize) {
2758 		return;
2759 	}
2760 }
2761 
chunk_unload(void)2762 void chunk_unload(void) {
2763 	delete gChunksMetadata;
2764 	gChunksMetadata = nullptr;
2765 }
2766 
chunk_newfs(void)2767 void chunk_newfs(void) {
2768 #ifndef METARESTORE
2769 	Chunk::count = 0;
2770 #endif
2771 	gChunksMetadata->nextchunkid = 1;
2772 }
2773 
2774 #ifndef METARESTORE
chunk_become_master()2775 void chunk_become_master() {
2776 	starttime = eventloop_time();
2777 	jobsnorepbefore = starttime + gOperationsDelayInit;
2778 	gChunkWorker = std::unique_ptr<ChunkWorker>(new ChunkWorker());
2779 	gChunkLoopEventHandle = eventloop_timeregister_ms(ChunksLoopPeriod, chunk_jobs_main);
2780 	eventloop_eachloopregister(chunk_jobs_process_bit);
2781 	return;
2782 }
2783 
chunk_reload(void)2784 void chunk_reload(void) {
2785 	uint32_t repl;
2786 	uint32_t looptime;
2787 
2788 	// Set deprecated values first and override them if newer definition is found
2789 	gOperationsDelayInit = cfg_getuint32("REPLICATIONS_DELAY_INIT", 300);
2790 	gOperationsDelayDisconnect = cfg_getuint32("REPLICATIONS_DELAY_DISCONNECT", 3600);
2791 	gOperationsDelayInit = cfg_getuint32("OPERATIONS_DELAY_INIT", gOperationsDelayInit);
2792 	gOperationsDelayDisconnect = cfg_getuint32("OPERATIONS_DELAY_DISCONNECT", gOperationsDelayDisconnect);
2793 	gAvoidSameIpChunkservers = cfg_getuint32("AVOID_SAME_IP_CHUNKSERVERS", 0);
2794 	gRedundancyLevel = cfg_getuint32("REDUNDANCY_LEVEL", 0);
2795 
2796 	uint32_t disableChunksDel = cfg_getuint32("DISABLE_CHUNKS_DEL", 0);
2797 	if (disableChunksDel) {
2798 		MaxDelSoftLimit = MaxDelHardLimit = 0;
2799 	} else {
2800 		uint32_t oldMaxDelSoftLimit = MaxDelSoftLimit;
2801 		uint32_t oldMaxDelHardLimit = MaxDelHardLimit;
2802 
2803 		MaxDelSoftLimit = cfg_getuint32("CHUNKS_SOFT_DEL_LIMIT",10);
2804 		if (cfg_isdefined("CHUNKS_HARD_DEL_LIMIT")) {
2805 			MaxDelHardLimit = cfg_getuint32("CHUNKS_HARD_DEL_LIMIT",25);
2806 			if (MaxDelHardLimit<MaxDelSoftLimit) {
2807 				MaxDelSoftLimit = MaxDelHardLimit;
2808 				lzfs_pretty_syslog(LOG_WARNING,"CHUNKS_SOFT_DEL_LIMIT is greater than CHUNKS_HARD_DEL_LIMIT - using CHUNKS_HARD_DEL_LIMIT for both");
2809 			}
2810 		} else {
2811 			MaxDelHardLimit = 3 * MaxDelSoftLimit;
2812 		}
2813 		if (MaxDelSoftLimit==0) {
2814 			MaxDelSoftLimit = oldMaxDelSoftLimit;
2815 			MaxDelHardLimit = oldMaxDelHardLimit;
2816 		}
2817 	}
2818 	if (TmpMaxDelFrac<MaxDelSoftLimit) {
2819 		TmpMaxDelFrac = MaxDelSoftLimit;
2820 	}
2821 	if (TmpMaxDelFrac>MaxDelHardLimit) {
2822 		TmpMaxDelFrac = MaxDelHardLimit;
2823 	}
2824 	if (TmpMaxDel<MaxDelSoftLimit) {
2825 		TmpMaxDel = MaxDelSoftLimit;
2826 	}
2827 	if (TmpMaxDel>MaxDelHardLimit) {
2828 		TmpMaxDel = MaxDelHardLimit;
2829 	}
2830 
2831 	repl = cfg_getuint32("CHUNKS_WRITE_REP_LIMIT", 2);
2832 	if (repl > 0) {
2833 		MaxWriteRepl = repl;
2834 	}
2835 
2836 	repl = cfg_getuint32("CHUNKS_READ_REP_LIMIT", 10);
2837 	if (repl > 0) {
2838 		MaxReadRepl = repl;
2839 	}
2840 
2841 	ChunksLoopPeriod = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_PERIOD", 1000, MINCHUNKSLOOPPERIOD, MAXCHUNKSLOOPPERIOD);
2842 	if (gChunkLoopEventHandle) {
2843 		eventloop_timechange_ms(gChunkLoopEventHandle, ChunksLoopPeriod);
2844 	}
2845 
2846 	repl = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MAX_CPU", 60, MINCHUNKSLOOPCPU, MAXCHUNKSLOOPCPU);
2847 	ChunksLoopTimeout = repl * ChunksLoopPeriod / 100;
2848 
2849 	if (cfg_isdefined("CHUNKS_LOOP_TIME")) {
2850 		looptime = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_TIME", 300, MINLOOPTIME, MAXLOOPTIME);
2851 		uint64_t scaled_looptime = std::max((uint64_t)1000 * looptime / ChunksLoopPeriod, (uint64_t)1);
2852 		HashSteps = 1 + ((HASHSIZE) / scaled_looptime);
2853 		HashCPS   = 0xFFFFFFFF;
2854 	} else {
2855 		looptime = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MIN_TIME", 300, MINLOOPTIME, MAXLOOPTIME);
2856 		HashCPS = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MAX_CPS", 100000, MINCPS, MAXCPS);
2857 		uint64_t scaled_looptime = std::max((uint64_t)1000 * looptime / ChunksLoopPeriod, (uint64_t)1);
2858 		HashSteps = 1 + ((HASHSIZE) / scaled_looptime);
2859 		HashCPS   = (uint64_t)ChunksLoopPeriod * HashCPS / 1000;
2860 	}
2861 	double endangeredChunksPriority = cfg_ranged_get("ENDANGERED_CHUNKS_PRIORITY", 0.0, 0.0, 1.0);
2862 	gEndangeredChunksServingLimit = HashSteps * endangeredChunksPriority;
2863 	gEndangeredChunksMaxCapacity = cfg_get("ENDANGERED_CHUNKS_MAX_CAPACITY", static_cast<uint64_t>(1024*1024UL));
2864 	gAcceptableDifference = cfg_ranged_get("ACCEPTABLE_DIFFERENCE",0.1, 0.001, 10.0);
2865 	RebalancingBetweenLabels = cfg_getuint32("CHUNKS_REBALANCING_BETWEEN_LABELS", 0) == 1;
2866 }
2867 #endif
2868 
chunk_strinit(void)2869 int chunk_strinit(void) {
2870 	gChunksMetadata = new ChunksMetadata;
2871 
2872 #ifndef METARESTORE
2873 	Chunk::count = 0;
2874 	for (int i = 0; i < CHUNK_MATRIX_SIZE; ++i) {
2875 		for (int j = 0; j < CHUNK_MATRIX_SIZE; ++j) {
2876 			Chunk::allFullChunkCopies[i][j] = 0;
2877 		}
2878 	}
2879 	Chunk::allChunksAvailability = ChunksAvailabilityState();
2880 	Chunk::allChunksReplicationState = ChunksReplicationState();
2881 
2882 	uint32_t disableChunksDel = cfg_getuint32("DISABLE_CHUNKS_DEL", 0);
2883 	gOperationsDelayInit = cfg_getuint32("REPLICATIONS_DELAY_INIT", 300);
2884 	gOperationsDelayDisconnect = cfg_getuint32("REPLICATIONS_DELAY_DISCONNECT", 3600);
2885 	gOperationsDelayInit = cfg_getuint32("OPERATIONS_DELAY_INIT", gOperationsDelayInit);
2886 	gOperationsDelayDisconnect = cfg_getuint32("OPERATIONS_DELAY_DISCONNECT", gOperationsDelayDisconnect);
2887 	gAvoidSameIpChunkservers = cfg_getuint32("AVOID_SAME_IP_CHUNKSERVERS", 0);
2888 	gRedundancyLevel = cfg_getuint32("REDUNDANCY_LEVEL", 0);
2889 
2890 	if (disableChunksDel) {
2891 		MaxDelHardLimit = MaxDelSoftLimit = 0;
2892 	} else {
2893 		MaxDelSoftLimit = cfg_getuint32("CHUNKS_SOFT_DEL_LIMIT",10);
2894 		if (cfg_isdefined("CHUNKS_HARD_DEL_LIMIT")) {
2895 			MaxDelHardLimit = cfg_getuint32("CHUNKS_HARD_DEL_LIMIT",25);
2896 			if (MaxDelHardLimit<MaxDelSoftLimit) {
2897 				MaxDelSoftLimit = MaxDelHardLimit;
2898 				lzfs_pretty_syslog(LOG_WARNING, "%s: CHUNKS_SOFT_DEL_LIMIT is greater than "
2899 					"CHUNKS_HARD_DEL_LIMIT - using CHUNKS_HARD_DEL_LIMIT for both",
2900 					cfg_filename().c_str());
2901 			}
2902 		} else {
2903 			MaxDelHardLimit = 3 * MaxDelSoftLimit;
2904 		}
2905 		if (MaxDelSoftLimit == 0) {
2906 			throw InitializeException(cfg_filename() + ": CHUNKS_SOFT_DEL_LIMIT is zero");
2907 		}
2908 	}
2909 	TmpMaxDelFrac = MaxDelSoftLimit;
2910 	TmpMaxDel = MaxDelSoftLimit;
2911 	MaxWriteRepl = cfg_getuint32("CHUNKS_WRITE_REP_LIMIT",2);
2912 	MaxReadRepl = cfg_getuint32("CHUNKS_READ_REP_LIMIT",10);
2913 	if (MaxReadRepl==0) {
2914 		throw InitializeException(cfg_filename() + ": CHUNKS_READ_REP_LIMIT is zero");
2915 	}
2916 	if (MaxWriteRepl==0) {
2917 		throw InitializeException(cfg_filename() + ": CHUNKS_WRITE_REP_LIMIT is zero");
2918 	}
2919 
2920 	ChunksLoopPeriod  = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_PERIOD", 1000, MINCHUNKSLOOPPERIOD, MAXCHUNKSLOOPPERIOD);
2921 	uint32_t repl = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MAX_CPU", 60, MINCHUNKSLOOPCPU, MAXCHUNKSLOOPCPU);
2922 	ChunksLoopTimeout = repl * ChunksLoopPeriod / 100;
2923 
2924 	uint32_t looptime;
2925 	if (cfg_isdefined("CHUNKS_LOOP_TIME")) {
2926 		lzfs_pretty_syslog(LOG_WARNING,
2927 				"%s: defining loop time by CHUNKS_LOOP_TIME option is "
2928 				"deprecated - use CHUNKS_LOOP_MAX_CPS and CHUNKS_LOOP_MIN_TIME",
2929 				cfg_filename().c_str());
2930 		looptime = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_TIME", 300, MINLOOPTIME, MAXLOOPTIME);
2931 		uint64_t scaled_looptime = std::max((uint64_t)1000 * looptime / ChunksLoopPeriod, (uint64_t)1);
2932 		HashSteps = 1 + ((HASHSIZE) / scaled_looptime);
2933 		HashCPS   = 0xFFFFFFFF;
2934 	} else {
2935 		looptime = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MIN_TIME", 300, MINLOOPTIME, MAXLOOPTIME);
2936 		HashCPS = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MAX_CPS", 100000, MINCPS, MAXCPS);
2937 		uint64_t scaled_looptime = std::max((uint64_t)1000 * looptime / ChunksLoopPeriod, (uint64_t)1);
2938 		HashSteps = 1 + ((HASHSIZE) / scaled_looptime);
2939 		HashCPS   = (uint64_t)ChunksLoopPeriod * HashCPS / 1000;
2940 	}
2941 	double endangeredChunksPriority = cfg_ranged_get("ENDANGERED_CHUNKS_PRIORITY", 0.0, 0.0, 1.0);
2942 	gEndangeredChunksServingLimit = HashSteps * endangeredChunksPriority;
2943 	gEndangeredChunksMaxCapacity = cfg_get("ENDANGERED_CHUNKS_MAX_CAPACITY", static_cast<uint64_t>(1024*1024UL));
2944 	gAcceptableDifference = cfg_ranged_get("ACCEPTABLE_DIFFERENCE", 0.1, 0.001, 10.0);
2945 	RebalancingBetweenLabels = cfg_getuint32("CHUNKS_REBALANCING_BETWEEN_LABELS", 0) == 1;
2946 	eventloop_reloadregister(chunk_reload);
2947 	metadataserver::registerFunctionCalledOnPromotion(chunk_become_master);
2948 	eventloop_eachloopregister(chunk_clean_zombie_servers_a_bit);
2949 	if (metadataserver::isMaster()) {
2950 		chunk_become_master();
2951 	}
2952 #endif
2953 	return 1;
2954 }
2955