1 /*
2 Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2014 EditShare, 2013-2017 Skytechnology sp. z o.o..
3
4 This file was part of MooseFS and is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20 #include "master/chunks.h"
21
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30 #include <unistd.h>
31 #include <unordered_map>
32 #include <algorithm>
33 #include <deque>
34
35 #include "common/chunks_availability_state.h"
36 #include "common/chunk_copies_calculator.h"
37 #include "common/compact_vector.h"
38 #include "common/counting_sort.h"
39 #include "common/coroutine.h"
40 #include "common/datapack.h"
41 #include "common/exceptions.h"
42 #include "common/event_loop.h"
43 #include "common/flat_set.h"
44 #include "common/goal.h"
45 #include "common/hashfn.h"
46 #include "common/lizardfs_version.h"
47 #include "common/loop_watchdog.h"
48 #include "common/massert.h"
49 #include "common/slice_traits.h"
50 #include "common/small_vector.h"
51 #include "master/chunkserver_db.h"
52 #include "master/checksum.h"
53 #include "master/chunk_goal_counters.h"
54 #include "master/filesystem.h"
55 #include "master/get_servers_for_new_chunk.h"
56 #include "master/goal_cache.h"
57 #include "protocol/MFSCommunication.h"
58
59 #ifdef METARESTORE
60 # include <time.h>
61 #else
62 # include "common/cfg.h"
63 # include "common/main.h"
64 # include "common/random.h"
65 # include "master/matoclserv.h"
66 # include "master/matocsserv.h"
67 # include "master/topology.h"
68 #endif
69
70 #define MINLOOPTIME 1
71 #define MAXLOOPTIME 7200
72 #define MAXCPS 10000000
73 #define MINCPS 500
74 #define MINCHUNKSLOOPPERIOD 40
75 #define MAXCHUNKSLOOPPERIOD 10000
76 #define MINCHUNKSLOOPCPU 10
77 #define MAXCHUNKSLOOPCPU 90
78
79 #define HASHSIZE 0x100000
80 #define HASHPOS(chunkid) (((uint32_t)chunkid)&0xFFFFF)
81
82 #define CHECKSUMSEED 78765491511151883ULL
83
84 #ifndef METARESTORE
85
86 static uint32_t gRedundancyLevel;
87 static uint64_t gEndangeredChunksServingLimit;
88 static uint64_t gEndangeredChunksMaxCapacity;
89 static uint64_t gDisconnectedCounter = 0;
90 bool gAvoidSameIpChunkservers = false;
91
92 struct ChunkPart {
93 enum {
94 INVALID =
95 0, /*!< Wrong version / or got info from chunkserver (IO error etc.) -> to delete. */
96 DEL, /*!< Deletion in progress. */
97 BUSY, /*!< Operation in progress. */
98 VALID, /*!< Ok. */
99 TDBUSY, /*!< To delete + BUSY. */
100 TDVALID /*!< Want to be deleted. */
101 };
102
103 uint32_t version; /*!< Part version. */
104 ChunkPartType type; /*!< Part type. */
105 uint16_t csid : 13; /*!< Chunkserver id. */
106 uint16_t state : 3; /*!< Chunk part state. */
107
ChunkPartChunkPart108 ChunkPart() : version(0), type(), csid(0), state(INVALID) {
109 }
110
ChunkPartChunkPart111 ChunkPart(const ChunkPart &other)
112 : version(other.version), type(other.type), csid(other.csid), state(other.state) {
113 }
114
ChunkPartChunkPart115 ChunkPart(uint16_t part_csid, int part_state, uint32_t part_version,
116 const ChunkPartType &part_type)
117 : version(part_version), type(part_type), csid(part_csid), state(part_state) {
118 }
119
is_busyChunkPart120 bool is_busy() const {
121 return state == BUSY || state == TDBUSY;
122 }
123
is_validChunkPart124 bool is_valid() const {
125 return state != INVALID && state != DEL;
126 }
127
is_todelChunkPart128 bool is_todel() const {
129 return state == TDVALID || state == TDBUSY;
130 }
131
mark_busyChunkPart132 void mark_busy() {
133 switch (state) {
134 case VALID:
135 state = BUSY;
136 break;
137 case TDVALID:
138 state = TDBUSY;
139 break;
140 default:
141 sassert(!"ChunkPartInfo::mark_busy(): wrong state");
142 }
143 }
unmark_busyChunkPart144 void unmark_busy() {
145 switch (state) {
146 case BUSY:
147 state = VALID;
148 break;
149 case TDBUSY:
150 state = TDVALID;
151 break;
152 default:
153 sassert(!"ChunkPartInfo::unmark_busy(): wrong state");
154 }
155 }
mark_todelChunkPart156 void mark_todel() {
157 switch (state) {
158 case VALID:
159 state = TDVALID;
160 break;
161 case BUSY:
162 state = TDBUSY;
163 break;
164 default:
165 sassert(!"ChunkPartInfo::mark_todel(): wrong state");
166 }
167 }
unmark_todelChunkPart168 void unmark_todel() {
169 switch (state) {
170 case TDVALID:
171 state = VALID;
172 break;
173 case TDBUSY:
174 state = BUSY;
175 break;
176 default:
177 sassert(!"ChunkPartInfo::unmark_todel(): wrong state");
178 }
179 }
180
serverChunkPart181 matocsserventry *server() const {
182 assert(csdb_find(csid));
183 assert(csdb_find(csid)->eptr);
184 return csdb_find(csid)->eptr;
185 }
186 };
187
188 static void* gChunkLoopEventHandle = NULL;
189
190 static uint32_t gOperationsDelayDisconnect = 3600;
191 static uint32_t gOperationsDelayInit = 300;
192
193 static uint32_t MaxWriteRepl;
194 static uint32_t MaxReadRepl;
195 static uint32_t MaxDelSoftLimit;
196 static uint32_t MaxDelHardLimit;
197 static double TmpMaxDelFrac;
198 static uint32_t TmpMaxDel;
199 static uint32_t HashSteps;
200 static uint32_t HashCPS;
201 static uint32_t ChunksLoopPeriod;
202 static uint32_t ChunksLoopTimeout;
203 static double gAcceptableDifference;
204 static bool RebalancingBetweenLabels = false;
205
206 static uint32_t jobsnorepbefore;
207
208 static uint32_t starttime;
209 #endif // METARESTORE
210
211 class Chunk {
212 static constexpr int kMaxStatCount = 15;
213 static_assert(CHUNK_MATRIX_SIZE <= kMaxStatCount, "stats matrix size too big for internal stats storage");
214 static_assert(ChunksAvailabilityState::kStateCount <= 3, "not enough space for chunk state");
215
216 public:
217 /* chunk.operation */
218 enum {
219 NONE,
220 CREATE,
221 SET_VERSION,
222 DUPLICATE,
223 TRUNCATE,
224 DUPTRUNC
225 };
226
227 uint64_t chunkid;
228 uint64_t checksum;
229 Chunk *next;
230 #ifndef METARESTORE
231 compact_vector<ChunkPart> parts;
232 #endif
233 private: // public/private sections are mixed here to make the struct as small as possible
234 ChunkGoalCounters goalCounters_;
235 public:
236 uint32_t version;
237 uint32_t lockid;
238 uint32_t lockedto;
239 #ifndef METARESTORE
240 uint8_t inEndangeredQueue:1;
241 uint8_t needverincrease:1;
242 uint8_t interrupted:1;
243 uint8_t operation:3;
244 private:
245 uint8_t allAvailabilityState_:2;
246 uint8_t copiesInStats_:4;
247 uint8_t allMissingParts_:4;
248 uint8_t allRedundantParts_:4;
249 uint8_t allFullCopies_:4;
250 #endif
251
252 public:
253 #ifndef METARESTORE
254 static ChunksAvailabilityState allChunksAvailability;
255 static ChunksReplicationState allChunksReplicationState;
256 static uint64_t count;
257 static uint64_t allFullChunkCopies[CHUNK_MATRIX_SIZE][CHUNK_MATRIX_SIZE];
258 static std::deque<Chunk *> endangeredChunks;
259 static GoalCache goalCache;
260 #endif
261
clear()262 void clear() {
263 goalCounters_.clear();
264 next = nullptr;
265 chunkid = 0;
266 version = 0;
267 lockid = 0;
268 lockedto = 0;
269 checksum = 0;
270 #ifndef METARESTORE
271 inEndangeredQueue = 0;
272 needverincrease = 1;
273 interrupted = 0;
274 operation = Chunk::NONE;
275 parts.clear();
276 allMissingParts_ = 0;
277 allRedundantParts_= 0;
278 allFullCopies_ = 0;
279 allAvailabilityState_ = ChunksAvailabilityState::kSafe;
280 copiesInStats_ = 0;
281 count++;
282 updateStats(false);
283 #endif
284 }
285
286 // Highest id of the chunk's goal
287 // This function is preserved only for backward compatibility of metadata checksums
288 // and shouldn't be used anywhere else.
highestIdGoal() const289 uint8_t highestIdGoal() const {
290 return goalCounters_.highestIdGoal();
291 }
292
293 // Number of files this chunk belongs to
fileCount() const294 uint32_t fileCount() const {
295 return goalCounters_.fileCount();
296 }
297
298 // Called when this chunk becomes a part of a file with the given goal
addFileWithGoal(uint8_t goal)299 void addFileWithGoal(uint8_t goal) {
300 #ifndef METARESTORE
301 removeFromStats();
302 #endif
303 goalCounters_.addFile(goal);
304 #ifndef METARESTORE
305 updateStats(false);
306 #endif
307 }
308
309 // Called when a file that this chunk belongs to is removed
removeFileWithGoal(uint8_t goal)310 void removeFileWithGoal(uint8_t goal) {
311 #ifndef METARESTORE
312 removeFromStats();
313 #endif
314 goalCounters_.removeFile(goal);
315 #ifndef METARESTORE
316 updateStats(false);
317 #endif
318 }
319
320 // Called when a file that this chunk belongs to changes goal
changeFileGoal(uint8_t prevGoal,uint8_t newGoal)321 void changeFileGoal(uint8_t prevGoal, uint8_t newGoal) {
322 #ifndef METARESTORE
323 removeFromStats();
324 #endif
325 goalCounters_.changeFileGoal(prevGoal, newGoal);
326 #ifndef METARESTORE
327 updateStats(false);
328 #endif
329 }
330
331 #ifndef METARESTORE
getGoal()332 Goal getGoal() {
333 // Do not search for empty goalCounters in cache
334 if (goalCounters_.size() == 0) {
335 return Goal();
336 }
337
338 auto it = goalCache.find(goalCounters_);
339 if (it != goalCache.end()) {
340 return it->second;
341 }
342
343 Goal result;
344 int prev_goal = -1;
345 for (auto counter : goalCounters_) {
346 const Goal &goal = fs_get_goal_definition(counter.goal);
347 if (prev_goal != (int)counter.goal) {
348 result.mergeIn(goal);
349 prev_goal = counter.goal;
350 }
351 }
352
353 goalCache.insert(goalCounters_, result);
354 return result;
355 }
356
357 // This method should be called when a chunk is removed
freeStats()358 void freeStats() {
359 count--;
360 removeFromStats();
361 }
362
363 // Updates statistics of all chunks
updateStats(bool remove_from_stats=true)364 void updateStats(bool remove_from_stats = true) {
365 int oldAllMissingParts = allMissingParts_;
366
367 if (remove_from_stats) {
368 removeFromStats();
369 }
370
371 Goal g = getGoal();
372
373 ChunkCopiesCalculator all(g);
374
375 for (const auto &part : parts) {
376 if (!part.is_valid()) {
377 continue;
378 }
379 all.addPart(part.type, csdb_find(part.csid)->label);
380 }
381
382 all.optimize();
383
384 allFullCopies_ = std::min(kMaxStatCount, all.getFullCopiesCount());
385 allAvailabilityState_ = all.getState();
386 allMissingParts_ = std::min(kMaxStatCount, all.countPartsToRecover());
387 allRedundantParts_ = std::min(kMaxStatCount, all.countPartsToRemove());
388 copiesInStats_ = std::min(kMaxStatCount, ChunkCopiesCalculator::getFullCopiesCount(g));
389
390 /* Enqueue a chunk as endangered only if:
391 * 1. Endangered chunks prioritization is on (limit > 0)
392 * 2. Limit of endangered chunks in queue is not reached
393 * 3. Chunk has more missing parts than it used to
394 * 4. Chunk is endangered
395 * 5. It is not already in queue
396 * By checking conditions below we assert no repetitions in endangered queue. */
397 if (gEndangeredChunksServingLimit > 0
398 && endangeredChunks.size() < gEndangeredChunksMaxCapacity
399 && allMissingParts_ > oldAllMissingParts
400 && allAvailabilityState_ == ChunksAvailabilityState::kEndangered
401 && !inEndangeredQueue) {
402 inEndangeredQueue = 1;
403 endangeredChunks.push_back(this);
404 }
405
406 addToStats();
407 }
408
isSafe() const409 bool isSafe() const {
410 return allAvailabilityState_ == ChunksAvailabilityState::kSafe;
411 }
412
isEndangered() const413 bool isEndangered() const {
414 return allAvailabilityState_ == ChunksAvailabilityState::kEndangered;
415 }
416
isLost() const417 bool isLost() const {
418 return allAvailabilityState_ == ChunksAvailabilityState::kLost;
419 }
420
isWritable()421 bool isWritable() {
422 return !isLost();
423 }
424
countMissingParts() const425 int countMissingParts() const {
426 return allMissingParts_;
427 }
428
countRedundantParts() const429 bool countRedundantParts() const {
430 return allRedundantParts_;
431 }
432
getFullCopiesCount() const433 uint8_t getFullCopiesCount() const {
434 return allFullCopies_;
435 }
436
isLocked() const437 bool isLocked() const {
438 return lockedto >= eventloop_time();
439 }
440
markCopyAsHavingWrongVersion(ChunkPart & part)441 void markCopyAsHavingWrongVersion(ChunkPart &part) {
442 part.state = ChunkPart::INVALID;
443 updateStats();
444 }
445
invalidateCopy(ChunkPart & part)446 void invalidateCopy(ChunkPart &part) {
447 part.state = ChunkPart::INVALID;
448 part.version = 0;
449 updateStats();
450 }
451
deleteCopy(ChunkPart & part)452 void deleteCopy(ChunkPart &part) {
453 part.state = ChunkPart::DEL;
454 updateStats();
455 }
456
457 private:
allCopiesState() const458 ChunksAvailabilityState::State allCopiesState() const {
459 return static_cast<ChunksAvailabilityState::State>(allAvailabilityState_);
460 }
461
removeFromStats()462 void removeFromStats() {
463 int prev_goal = -1;
464 for (const auto& counter : goalCounters_) {
465 if (prev_goal == (int)counter.goal) {
466 continue;
467 }
468 prev_goal = counter.goal;
469 allChunksAvailability.removeChunk(counter.goal, allCopiesState());
470 allChunksReplicationState.removeChunk(counter.goal, allMissingParts_, allRedundantParts_);
471 }
472
473 uint8_t limitedGoal = std::min<uint8_t>(CHUNK_MATRIX_SIZE - 1, copiesInStats_);
474 uint8_t limitedAll = std::min<uint8_t>(CHUNK_MATRIX_SIZE - 1, allFullCopies_);
475 allFullChunkCopies[limitedGoal][limitedAll]--;
476 }
477
addToStats()478 void addToStats() {
479 int prev_goal = -1;
480 for (const auto& counter : goalCounters_) {
481 if (prev_goal == (int)counter.goal) {
482 continue;
483 }
484 prev_goal = counter.goal;
485 allChunksAvailability.addChunk(counter.goal, allCopiesState());
486 allChunksReplicationState.addChunk(counter.goal, allMissingParts_, allRedundantParts_);
487 }
488
489 uint8_t limitedGoal = std::min<uint8_t>(CHUNK_MATRIX_SIZE - 1, copiesInStats_);
490 uint8_t limitedAll = std::min<uint8_t>(CHUNK_MATRIX_SIZE - 1, allFullCopies_);
491 allFullChunkCopies[limitedGoal][limitedAll]++;
492 }
493 #endif
494 };
495
496 constexpr int Chunk::kMaxStatCount;
497
498 #ifndef METARESTORE
499
500 std::deque<Chunk *> Chunk::endangeredChunks;
501 GoalCache Chunk::goalCache(10000);
502 ChunksAvailabilityState Chunk::allChunksAvailability;
503 ChunksReplicationState Chunk::allChunksReplicationState;
504 uint64_t Chunk::count;
505 uint64_t Chunk::allFullChunkCopies[CHUNK_MATRIX_SIZE][CHUNK_MATRIX_SIZE];
506 #endif
507
508 #define CHUNK_BUCKET_SIZE 20000
509 struct chunk_bucket {
510 Chunk bucket[CHUNK_BUCKET_SIZE];
511 uint32_t firstfree;
512 chunk_bucket *next;
513 };
514
515 namespace {
516 struct ChunksMetadata {
517 // chunks
518 chunk_bucket *cbhead;
519 Chunk *chfreehead;
520 Chunk *chunkhash[HASHSIZE];
521 uint64_t lastchunkid;
522 Chunk *lastchunkptr;
523
524 // other chunks metadata information
525 uint64_t nextchunkid;
526 uint64_t chunksChecksum;
527 uint64_t chunksChecksumRecalculated;
528 uint32_t checksumRecalculationPosition;
529
ChunksMetadata__anond7a408460311::ChunksMetadata530 ChunksMetadata() :
531 cbhead{},
532 chfreehead{},
533 chunkhash{},
534 lastchunkid{},
535 lastchunkptr{},
536 nextchunkid{1},
537 chunksChecksum{},
538 chunksChecksumRecalculated{},
539 checksumRecalculationPosition{0} {
540 }
541
~ChunksMetadata__anond7a408460311::ChunksMetadata542 ~ChunksMetadata() {
543 chunk_bucket *cbn;
544 for (chunk_bucket *cb = cbhead; cb; cb = cbn) {
545 cbn = cb->next;
546 delete cb;
547 }
548 }
549 };
550 } // anonymous namespace
551
552 static ChunksMetadata *gChunksMetadata;
553
554 #define LOCKTIMEOUT 120
555 #define UNUSED_DELETE_TIMEOUT (86400*7)
556
557 #ifndef METARESTORE
558
559 static Chunk *gCurrentChunkInZombieLoop = nullptr;
560
561 class ReplicationDelayInfo {
562 public:
ReplicationDelayInfo()563 ReplicationDelayInfo()
564 : disconnectedServers_(0),
565 timestamp_() {}
566
serverDisconnected()567 void serverDisconnected() {
568 refresh();
569 ++disconnectedServers_;
570 timestamp_ = eventloop_time() + gOperationsDelayDisconnect;
571 }
572
serverConnected()573 void serverConnected() {
574 refresh();
575 if (disconnectedServers_ > 0) {
576 --disconnectedServers_;
577 }
578 }
579
replicationAllowed(int missingCopies)580 bool replicationAllowed(int missingCopies) {
581 refresh();
582 return missingCopies > disconnectedServers_;
583 }
584
585 private:
586 uint16_t disconnectedServers_;
587 uint32_t timestamp_;
588
refresh()589 void refresh() {
590 if (eventloop_time() > timestamp_) {
591 disconnectedServers_ = 0;
592 }
593 }
594
595 };
596
597 /*
598 * Information about recently disconnected and connected servers
599 * necessary for replication to unlabeled servers.
600 */
601 static ReplicationDelayInfo replicationDelayInfoForAll;
602
603 /*
604 * Information about recently disconnected and connected servers
605 * necessary for replication to servers with specified label.
606 */
607 static std::unordered_map<MediaLabel, ReplicationDelayInfo, MediaLabel::hash> replicationDelayInfoForLabel;
608
609 struct job_info {
610 uint32_t del_invalid;
611 uint32_t del_unused;
612 uint32_t del_diskclean;
613 uint32_t del_overgoal;
614 uint32_t copy_undergoal;
615 };
616
617 struct loop_info {
618 job_info done,notdone;
619 uint32_t copy_rebalance;
620 };
621
622 static loop_info chunksinfo = {{0,0,0,0,0},{0,0,0,0,0},0};
623 static uint32_t chunksinfo_loopstart=0,chunksinfo_loopend=0;
624
625 static uint32_t stats_deletions=0;
626 static uint32_t stats_replications=0;
627
chunk_stats(uint32_t * del,uint32_t * repl)628 void chunk_stats(uint32_t *del,uint32_t *repl) {
629 *del = stats_deletions;
630 *repl = stats_replications;
631 stats_deletions = 0;
632 stats_replications = 0;
633 }
634
635 #endif // ! METARESTORE
636
chunk_checksum(const Chunk * c)637 static uint64_t chunk_checksum(const Chunk *c) {
638 if (c == nullptr || c->fileCount() == 0) {
639 // We treat chunks with fileCount=0 as non-existent, so that we don't have to notify shadow
640 // masters when we remove them from our structures.
641 return 0;
642 }
643 uint64_t checksum = 64517419147637ULL;
644 // Only highest id goal is taken into checksum for compatibility reasons
645 hashCombine(checksum, c->chunkid, c->version, c->lockedto, c->highestIdGoal(), c->fileCount());
646
647 return checksum;
648 }
649
chunk_checksum_add_to_background(Chunk * ch)650 static void chunk_checksum_add_to_background(Chunk *ch) {
651 if (!ch) {
652 return;
653 }
654 removeFromChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
655 ch->checksum = chunk_checksum(ch);
656 addToChecksum(gChunksMetadata->chunksChecksumRecalculated, ch->checksum);
657 addToChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
658 }
659
chunk_update_checksum(Chunk * ch)660 static void chunk_update_checksum(Chunk *ch) {
661 if (!ch) {
662 return;
663 }
664 if (HASHPOS(ch->chunkid) < gChunksMetadata->checksumRecalculationPosition) {
665 removeFromChecksum(gChunksMetadata->chunksChecksumRecalculated, ch->checksum);
666 }
667 removeFromChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
668 ch->checksum = chunk_checksum(ch);
669 if (HASHPOS(ch->chunkid) < gChunksMetadata->checksumRecalculationPosition) {
670 lzfs_silent_syslog(LOG_DEBUG, "master.fs.checksum.changing_recalculated_chunk");
671 addToChecksum(gChunksMetadata->chunksChecksumRecalculated, ch->checksum);
672 } else {
673 lzfs_silent_syslog(LOG_DEBUG, "master.fs.checksum.changing_not_recalculated_chunk");
674 }
675 addToChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
676 }
677
678 /*!
679 * \brief update chunks checksum in the background
680 * \param limit for processed chunks per function call
681 * \return info whether all chunks were updated or not.
682 */
683
chunks_update_checksum_a_bit(uint32_t speedLimit)684 ChecksumRecalculationStatus chunks_update_checksum_a_bit(uint32_t speedLimit) {
685 if (gChunksMetadata->checksumRecalculationPosition == 0) {
686 gChunksMetadata->chunksChecksumRecalculated = CHECKSUMSEED;
687 }
688 uint32_t recalculated = 0;
689 while (gChunksMetadata->checksumRecalculationPosition < HASHSIZE) {
690 Chunk *c;
691 for (c = gChunksMetadata->chunkhash[gChunksMetadata->checksumRecalculationPosition]; c; c=c->next) {
692 chunk_checksum_add_to_background(c);
693 ++recalculated;
694 }
695 ++gChunksMetadata->checksumRecalculationPosition;
696 if (recalculated >= speedLimit) {
697 return ChecksumRecalculationStatus::kInProgress;
698 }
699 }
700 // Recalculating chunks checksum finished
701 gChunksMetadata->checksumRecalculationPosition = 0;
702 if (gChunksMetadata->chunksChecksum != gChunksMetadata->chunksChecksumRecalculated) {
703 lzfs_pretty_syslog(LOG_WARNING,"Chunks metadata checksum mismatch found, replacing with a new value.");
704 lzfs_silent_syslog(LOG_DEBUG, "master.fs.checksum.mismatch");
705 gChunksMetadata->chunksChecksum = gChunksMetadata->chunksChecksumRecalculated;
706 }
707 return ChecksumRecalculationStatus::kDone;
708 }
709
chunk_recalculate_checksum()710 static void chunk_recalculate_checksum() {
711 gChunksMetadata->chunksChecksum = CHECKSUMSEED;
712 for (int i = 0; i < HASHSIZE; ++i) {
713 for (Chunk *ch = gChunksMetadata->chunkhash[i]; ch; ch = ch->next) {
714 ch->checksum = chunk_checksum(ch);
715 addToChecksum(gChunksMetadata->chunksChecksum, ch->checksum);
716 }
717 }
718 }
719
chunk_checksum(ChecksumMode mode)720 uint64_t chunk_checksum(ChecksumMode mode) {
721 uint64_t checksum = 46586918175221;
722 addToChecksum(checksum, gChunksMetadata->nextchunkid);
723 if (mode == ChecksumMode::kForceRecalculate) {
724 chunk_recalculate_checksum();
725 }
726 addToChecksum(checksum, gChunksMetadata->chunksChecksum);
727 return checksum;
728 }
729
chunk_malloc()730 static inline Chunk *chunk_malloc() {
731 chunk_bucket *cb;
732 Chunk *ret;
733 if (gChunksMetadata->chfreehead) {
734 ret = gChunksMetadata->chfreehead;
735 gChunksMetadata->chfreehead = ret->next;
736 ret->clear();
737 return ret;
738 }
739 if (gChunksMetadata->cbhead==NULL || gChunksMetadata->cbhead->firstfree==CHUNK_BUCKET_SIZE) {
740 cb = new chunk_bucket;
741 cb->next = gChunksMetadata->cbhead;
742 cb->firstfree = 0;
743 gChunksMetadata->cbhead = cb;
744 }
745 ret = (gChunksMetadata->cbhead->bucket)+(gChunksMetadata->cbhead->firstfree);
746 gChunksMetadata->cbhead->firstfree++;
747 ret->clear();
748 return ret;
749 }
750
751 #ifndef METARESTORE
chunk_free(Chunk * p)752 static inline void chunk_free(Chunk *p) {
753 p->next = gChunksMetadata->chfreehead;
754 gChunksMetadata->chfreehead = p;
755 p->inEndangeredQueue = 0;
756 }
757 #endif /* METARESTORE */
758
chunk_new(uint64_t chunkid,uint32_t chunkversion)759 Chunk *chunk_new(uint64_t chunkid, uint32_t chunkversion) {
760 uint32_t chunkpos = HASHPOS(chunkid);
761 Chunk *newchunk;
762 newchunk = chunk_malloc();
763 newchunk->next = gChunksMetadata->chunkhash[chunkpos];
764 gChunksMetadata->chunkhash[chunkpos] = newchunk;
765 newchunk->chunkid = chunkid;
766 newchunk->version = chunkversion;
767 gChunksMetadata->lastchunkid = chunkid;
768 gChunksMetadata->lastchunkptr = newchunk;
769 chunk_update_checksum(newchunk);
770 return newchunk;
771 }
772
773 #ifndef METARESTORE
chunk_emergency_increase_version(Chunk * c)774 void chunk_emergency_increase_version(Chunk *c) {
775 assert(c->isWritable());
776 uint32_t i = 0;
777 for (auto &part : c->parts) {
778 if (part.is_valid()) {
779 if (!part.is_busy()) {
780 part.mark_busy();
781 }
782 part.version = c->version+1;
783 matocsserv_send_setchunkversion(part.server(),c->chunkid,c->version+1,c->version,
784 part.type);
785 i++;
786 }
787 }
788 c->interrupted = 0;
789 c->operation = Chunk::SET_VERSION;
790 c->version++;
791 chunk_update_checksum(c);
792 fs_incversion(c->chunkid);
793 }
794
chunk_handle_disconnected_copies(Chunk * c)795 void chunk_handle_disconnected_copies(Chunk *c) {
796 auto it = std::remove_if(c->parts.begin(), c->parts.end(), [](const ChunkPart &part) {
797 return csdb_find(part.csid)->eptr == nullptr;
798 });
799 bool lost_copy_found = it != c->parts.end();
800
801 if (lost_copy_found) {
802 c->parts.erase(it, c->parts.end());
803 c->needverincrease = 1;
804 c->updateStats();
805 }
806
807 if (lost_copy_found && c->operation != Chunk::NONE) {
808 bool any_copy_busy = std::any_of(c->parts.begin(), c->parts.end(), [](const ChunkPart &part) {
809 return part.is_busy();
810 });
811 if (any_copy_busy) {
812 c->interrupted = 1;
813 } else {
814 if (c->isWritable()) {
815 chunk_emergency_increase_version(c);
816 } else {
817 matoclserv_chunk_status(c->chunkid,LIZARDFS_ERROR_NOTDONE);
818 c->operation = Chunk::NONE;
819 }
820 }
821 }
822 }
823 #endif
824
chunk_find(uint64_t chunkid)825 Chunk *chunk_find(uint64_t chunkid) {
826 uint32_t chunkpos = HASHPOS(chunkid);
827 Chunk *chunkit;
828 if (gChunksMetadata->lastchunkid==chunkid) {
829 return gChunksMetadata->lastchunkptr;
830 }
831 for (chunkit = gChunksMetadata->chunkhash[chunkpos] ; chunkit ; chunkit = chunkit->next) {
832 if (chunkit->chunkid == chunkid) {
833 gChunksMetadata->lastchunkid = chunkid;
834 gChunksMetadata->lastchunkptr = chunkit;
835 #ifndef METARESTORE
836 chunk_handle_disconnected_copies(chunkit);
837 #endif // METARESTORE
838 return chunkit;
839 }
840 }
841 return NULL;
842 }
843
844 #ifndef METARESTORE
chunk_delete(Chunk * c)845 void chunk_delete(Chunk *c) {
846 if (gChunksMetadata->lastchunkptr==c) {
847 gChunksMetadata->lastchunkid=0;
848 gChunksMetadata->lastchunkptr=NULL;
849 }
850 c->freeStats();
851 chunk_free(c);
852 }
853
chunk_count(void)854 uint32_t chunk_count(void) {
855 return Chunk::count;
856 }
857
chunk_info(uint32_t * allchunks,uint32_t * allcopies,uint32_t * regularvalidcopies)858 void chunk_info(uint32_t *allchunks,uint32_t *allcopies,uint32_t *regularvalidcopies) {
859 *allchunks = Chunk::count;
860 *allcopies = 0;
861 *regularvalidcopies = 0;
862 for (int actualCopies = 1; actualCopies < CHUNK_MATRIX_SIZE; actualCopies++) {
863 uint32_t ag = 0;
864 for (int expectedCopies = 0; expectedCopies < CHUNK_MATRIX_SIZE; expectedCopies++) {
865 ag += Chunk::allFullChunkCopies[expectedCopies][actualCopies];
866 }
867 *allcopies += ag * actualCopies;
868 }
869 }
870
chunk_get_missing_count(void)871 uint32_t chunk_get_missing_count(void) {
872 uint32_t res = 0;
873 for (uint8_t goal = GoalId::kMin; goal <= GoalId::kMax; ++goal) {
874 res += Chunk::allChunksAvailability.lostChunks(goal);
875 }
876 return res;
877 }
878
chunk_store_chunkcounters(uint8_t * buff,uint8_t matrixid)879 void chunk_store_chunkcounters(uint8_t *buff,uint8_t matrixid) {
880 if (matrixid == MATRIX_ALL_COPIES) {
881 for (int i = 0; i < CHUNK_MATRIX_SIZE; i++) {
882 for (int j = 0; j < CHUNK_MATRIX_SIZE; j++) {
883 put32bit(&buff, Chunk::allFullChunkCopies[i][j]);
884 }
885 }
886 } else {
887 memset(buff, 0, CHUNK_MATRIX_SIZE * CHUNK_MATRIX_SIZE * sizeof(uint32_t));
888 }
889 }
890 #endif
891
892 /// updates chunk's goal after a file goal has been changed
chunk_change_file(uint64_t chunkid,uint8_t prevgoal,uint8_t newgoal)893 int chunk_change_file(uint64_t chunkid,uint8_t prevgoal,uint8_t newgoal) {
894 Chunk *c;
895 if (prevgoal==newgoal) {
896 return LIZARDFS_STATUS_OK;
897 }
898 c = chunk_find(chunkid);
899 if (c==NULL) {
900 return LIZARDFS_ERROR_NOCHUNK;
901 }
902 try {
903 c->changeFileGoal(prevgoal, newgoal);
904 } catch (Exception& ex) {
905 lzfs_pretty_syslog(LOG_WARNING, "chunk_change_file: %s", ex.what());
906 return LIZARDFS_ERROR_CHUNKLOST;
907 }
908 chunk_update_checksum(c);
909 return LIZARDFS_STATUS_OK;
910 }
911
912 /// updates chunk's goal after a file with goal `goal' has been removed
chunk_delete_file_int(Chunk * c,uint8_t goal)913 static inline int chunk_delete_file_int(Chunk *c, uint8_t goal) {
914 try {
915 c->removeFileWithGoal(goal);
916 } catch (Exception& ex) {
917 lzfs_pretty_syslog(LOG_WARNING, "chunk_delete_file_int: %s", ex.what());
918 return LIZARDFS_ERROR_CHUNKLOST;
919 }
920 chunk_update_checksum(c);
921 return LIZARDFS_STATUS_OK;
922 }
923
924 /// updates chunk's goal after a file with goal `goal' has been added
chunk_add_file_int(Chunk * c,uint8_t goal)925 static inline int chunk_add_file_int(Chunk *c, uint8_t goal) {
926 try {
927 c->addFileWithGoal(goal);
928 } catch (Exception& ex) {
929 lzfs_pretty_syslog(LOG_WARNING, "chunk_add_file_int: %s", ex.what());
930 return LIZARDFS_ERROR_CHUNKLOST;
931 }
932 chunk_update_checksum(c);
933 return LIZARDFS_STATUS_OK;
934 }
935
chunk_delete_file(uint64_t chunkid,uint8_t goal)936 int chunk_delete_file(uint64_t chunkid,uint8_t goal) {
937 Chunk *c;
938 c = chunk_find(chunkid);
939 if (c==NULL) {
940 return LIZARDFS_ERROR_NOCHUNK;
941 }
942 return chunk_delete_file_int(c,goal);
943 }
944
chunk_add_file(uint64_t chunkid,uint8_t goal)945 int chunk_add_file(uint64_t chunkid,uint8_t goal) {
946 Chunk *c;
947 c = chunk_find(chunkid);
948 if (c==NULL) {
949 return LIZARDFS_ERROR_NOCHUNK;
950 }
951 return chunk_add_file_int(c,goal);
952 }
953
chunk_can_unlock(uint64_t chunkid,uint32_t lockid)954 int chunk_can_unlock(uint64_t chunkid, uint32_t lockid) {
955 Chunk *c;
956 c = chunk_find(chunkid);
957 if (c==NULL) {
958 return LIZARDFS_ERROR_NOCHUNK;
959 }
960 if (lockid == 0) {
961 // lockid == 0 -> force unlock
962 return LIZARDFS_STATUS_OK;
963 }
964 // We will let client unlock the chunk even if c->lockedto < eventloop_time()
965 // if he provides lockId that was used to lock the chunk -- this means that nobody
966 // else used this chunk since it was locked (operations like truncate or replicate
967 // would remove such a stale lock before modifying the chunk)
968 if (c->lockid == lockid) {
969 return LIZARDFS_STATUS_OK;
970 } else if (c->lockedto == 0) {
971 return LIZARDFS_ERROR_NOTLOCKED;
972 } else {
973 return LIZARDFS_ERROR_WRONGLOCKID;
974 }
975 }
976
chunk_unlock(uint64_t chunkid)977 int chunk_unlock(uint64_t chunkid) {
978 Chunk *c;
979 c = chunk_find(chunkid);
980 if (c==NULL) {
981 return LIZARDFS_ERROR_NOCHUNK;
982 }
983 // Don't remove lockid to safely accept retransmission of FUSE_CHUNK_UNLOCK message
984 c->lockedto = 0;
985 chunk_update_checksum(c);
986 return LIZARDFS_STATUS_OK;
987 }
988
989 #ifndef METARESTORE
990
chunk_invalidate_goal_cache()991 int chunk_invalidate_goal_cache(){
992 Chunk::goalCache.clear();
993 return LIZARDFS_STATUS_OK;
994 }
995
chunk_has_only_invalid_copies(uint64_t chunkid)996 bool chunk_has_only_invalid_copies(uint64_t chunkid) {
997 if (chunkid == 0) {
998 return false;
999 }
1000 Chunk *c = chunk_find(chunkid);
1001 if (c == NULL || !c->isLost()) {
1002 return false;
1003 }
1004 // Chunk is lost, so it can only have INVALID or DEL copies.
1005 // Return true it there is at least one INVALID.
1006 return std::any_of(c->parts.begin(), c->parts.end(), [](const ChunkPart& part) {
1007 return part.state == ChunkPart::INVALID;
1008 });
1009 }
1010
chunk_get_fullcopies(uint64_t chunkid,uint8_t * vcopies)1011 int chunk_get_fullcopies(uint64_t chunkid,uint8_t *vcopies) {
1012 Chunk *c;
1013 *vcopies = 0;
1014 c = chunk_find(chunkid);
1015 if (c==NULL) {
1016 return LIZARDFS_ERROR_NOCHUNK;
1017 }
1018
1019 *vcopies = c->getFullCopiesCount();
1020
1021 return LIZARDFS_STATUS_OK;
1022 }
1023
chunk_get_partstomodify(uint64_t chunkid,int & recover,int & remove)1024 int chunk_get_partstomodify(uint64_t chunkid, int &recover, int &remove) {
1025 Chunk *c;
1026 recover = 0;
1027 remove = 0;
1028 c = chunk_find(chunkid);
1029 if (c==NULL) {
1030 return LIZARDFS_ERROR_NOCHUNK;
1031 }
1032 recover = c->countMissingParts();
1033 remove = c->countRedundantParts();
1034 return LIZARDFS_STATUS_OK;
1035 }
1036
chunk_multi_modify(uint64_t ochunkid,uint32_t * lockid,uint8_t goal,bool usedummylockid,bool quota_exceeded,uint8_t * opflag,uint64_t * nchunkid,uint32_t min_server_version=0)1037 uint8_t chunk_multi_modify(uint64_t ochunkid, uint32_t *lockid, uint8_t goal,
1038 bool usedummylockid, bool quota_exceeded, uint8_t *opflag, uint64_t *nchunkid,
1039 uint32_t min_server_version = 0) {
1040 Chunk *c = NULL;
1041 if (ochunkid == 0) { // new chunk
1042 if (quota_exceeded) {
1043 return LIZARDFS_ERROR_QUOTA;
1044 }
1045 auto serversWithChunkTypes = matocsserv_getservers_for_new_chunk(goal, min_server_version);
1046 if (serversWithChunkTypes.empty()) {
1047 uint16_t uscount,tscount;
1048 double minusage,maxusage;
1049 matocsserv_usagedifference(&minusage,&maxusage,&uscount,&tscount);
1050 if ((uscount > 0) && (eventloop_time() > (starttime+600))) { // if there are chunkservers and it's at least one minute after start then it means that there is no space left
1051 return LIZARDFS_ERROR_NOSPACE;
1052 } else {
1053 return LIZARDFS_ERROR_NOCHUNKSERVERS;
1054 }
1055 }
1056 ChunkCopiesCalculator calculator(fs_get_goal_definition(goal));
1057 for (const auto &server_with_type : serversWithChunkTypes) {
1058 calculator.addPart(server_with_type.second, MediaLabel::kWildcard);
1059 }
1060 calculator.evalRedundancyLevel();
1061 if (!calculator.isSafeEnoughToWrite(gRedundancyLevel)) {
1062 return LIZARDFS_ERROR_NOCHUNKSERVERS;
1063 }
1064 c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1065 c->interrupted = 0;
1066 c->operation = Chunk::CREATE;
1067 chunk_add_file_int(c,goal);
1068 for (const auto &server_with_type : serversWithChunkTypes) {
1069 c->parts.push_back(ChunkPart(matocsserv_get_csdb(server_with_type.first)->csid,
1070 ChunkPart::BUSY, c->version, server_with_type.second));
1071 matocsserv_send_createchunk(server_with_type.first, c->chunkid, server_with_type.second,
1072 c->version);
1073 }
1074 c->updateStats();
1075 *opflag=1;
1076 *nchunkid = c->chunkid;
1077 } else {
1078 Chunk *oc = chunk_find(ochunkid);
1079 if (oc==NULL) {
1080 return LIZARDFS_ERROR_NOCHUNK;
1081 }
1082 if (*lockid != 0 && *lockid != oc->lockid) {
1083 if (oc->lockid == 0 || oc->lockedto == 0) {
1084 // Lock was removed by some chunk operation or by a different client
1085 return LIZARDFS_ERROR_NOTLOCKED;
1086 } else {
1087 return LIZARDFS_ERROR_WRONGLOCKID;
1088 }
1089 }
1090 if (*lockid == 0 && oc->isLocked()) {
1091 return LIZARDFS_ERROR_LOCKED;
1092 }
1093 if (!oc->isWritable()) {
1094 return LIZARDFS_ERROR_CHUNKLOST;
1095 }
1096 ChunkCopiesCalculator calculator(oc->getGoal());
1097 for (auto &part : oc->parts) {
1098 calculator.addPart(part.type, MediaLabel::kWildcard);
1099 }
1100 calculator.evalRedundancyLevel();
1101 if (!calculator.isSafeEnoughToWrite(gRedundancyLevel)) {
1102 return LIZARDFS_ERROR_NOCHUNKSERVERS;
1103 }
1104
1105 if (oc->fileCount() == 1) { // refcount==1
1106 *nchunkid = ochunkid;
1107 c = oc;
1108 if (c->operation != Chunk::NONE) {
1109 return LIZARDFS_ERROR_CHUNKBUSY;
1110 }
1111 if (c->needverincrease) {
1112 assert(c->isWritable());
1113 for (auto &part : c->parts) {
1114 if (part.is_valid()) {
1115 if (!part.is_busy()) {
1116 part.mark_busy();
1117 }
1118 part.version = c->version+1;
1119 matocsserv_send_setchunkversion(part.server(), ochunkid, c->version+1, c->version,
1120 part.type);
1121 }
1122 }
1123 c->interrupted = 0;
1124 c->operation = Chunk::SET_VERSION;
1125 c->version++;
1126 *opflag=1;
1127 } else {
1128 *opflag=0;
1129 }
1130 } else {
1131 if (oc->fileCount() == 0) { // it's serious structure error
1132 lzfs_pretty_syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016" PRIX64 ")",ochunkid);
1133 return LIZARDFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE
1134 }
1135 if (quota_exceeded) {
1136 return LIZARDFS_ERROR_QUOTA;
1137 }
1138 assert(oc->isWritable());
1139 c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1140 c->interrupted = 0;
1141 c->operation = Chunk::DUPLICATE;
1142 chunk_delete_file_int(oc,goal);
1143 chunk_add_file_int(c,goal);
1144 for (const auto &old_part : oc->parts) {
1145 if (old_part.is_valid()) {
1146 c->parts.push_back(ChunkPart(old_part.csid, ChunkPart::BUSY, c->version, old_part.type));
1147 matocsserv_send_duplicatechunk(old_part.server(), c->chunkid, c->version, old_part.type,
1148 oc->chunkid, oc->version);
1149 }
1150 }
1151 c->updateStats();
1152 *nchunkid = c->chunkid;
1153 *opflag=1;
1154 }
1155 }
1156
1157 c->lockedto = eventloop_time() + LOCKTIMEOUT;
1158 if (*lockid == 0) {
1159 if (usedummylockid) {
1160 *lockid = 1;
1161 } else {
1162 *lockid = 2 + rnd_ranged<uint32_t>(0xFFFFFFF0); // some random number greater than 1
1163 }
1164 }
1165 c->lockid = *lockid;
1166 chunk_update_checksum(c);
1167 return LIZARDFS_STATUS_OK;
1168 }
1169
chunk_multi_truncate(uint64_t ochunkid,uint32_t lockid,uint32_t length,uint8_t goal,bool denyTruncatingParityParts,bool quota_exceeded,uint64_t * nchunkid)1170 uint8_t chunk_multi_truncate(uint64_t ochunkid, uint32_t lockid, uint32_t length,
1171 uint8_t goal, bool denyTruncatingParityParts, bool quota_exceeded, uint64_t *nchunkid) {
1172 Chunk *oc, *c;
1173
1174 c=NULL;
1175 oc = chunk_find(ochunkid);
1176 if (oc==NULL) {
1177 return LIZARDFS_ERROR_NOCHUNK;
1178 }
1179 if (!oc->isWritable()) {
1180 return LIZARDFS_ERROR_CHUNKLOST;
1181 }
1182 if (oc->isLocked() && (lockid == 0 || lockid != oc->lockid)) {
1183 return LIZARDFS_ERROR_LOCKED;
1184 }
1185 if (denyTruncatingParityParts) {
1186 for (const auto &part : oc->parts) {
1187 if (slice_traits::isParityPart(part.type)) {
1188 return LIZARDFS_ERROR_NOTPOSSIBLE;
1189 }
1190 }
1191 }
1192 if (oc->fileCount() == 1) { // refcount==1
1193 *nchunkid = ochunkid;
1194 c = oc;
1195 if (c->operation != Chunk::NONE) {
1196 return LIZARDFS_ERROR_CHUNKBUSY;
1197 }
1198 assert(c->isWritable());
1199 for (auto &part : c->parts) {
1200 if (part.is_valid()) {
1201 if (!part.is_busy()) {
1202 part.mark_busy();
1203 }
1204 part.version = c->version+1;
1205 uint32_t chunkTypeLength =
1206 slice_traits::chunkLengthToChunkPartLength(part.type, length);
1207 matocsserv_send_truncatechunk(part.server(), ochunkid, part.type, chunkTypeLength,
1208 c->version + 1, c->version);
1209 }
1210 }
1211 c->interrupted = 0;
1212 c->operation = Chunk::TRUNCATE;
1213 c->version++;
1214 } else {
1215 if (oc->fileCount() == 0) { // it's serious structure error
1216 lzfs_pretty_syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016" PRIX64 ")",ochunkid);
1217 return LIZARDFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE
1218 }
1219 if (quota_exceeded) {
1220 return LIZARDFS_ERROR_QUOTA;
1221 }
1222
1223 assert(oc->isWritable());
1224 c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1225 c->interrupted = 0;
1226 c->operation = Chunk::DUPTRUNC;
1227 chunk_delete_file_int(oc,goal);
1228 chunk_add_file_int(c,goal);
1229 for (const auto &old_part : oc->parts) {
1230 if (old_part.is_valid()) {
1231 c->parts.push_back(ChunkPart(old_part.csid, ChunkPart::BUSY, c->version, old_part.type));
1232 matocsserv_send_duptruncchunk(old_part.server(), c->chunkid, c->version,
1233 old_part.type, oc->chunkid, oc->version,
1234 slice_traits::chunkLengthToChunkPartLength(old_part.type, length));
1235 }
1236 }
1237 c->updateStats();
1238 *nchunkid = c->chunkid;
1239 }
1240
1241 c->lockedto=(uint32_t)eventloop_time()+LOCKTIMEOUT;
1242 c->lockid = lockid;
1243 chunk_update_checksum(c);
1244 return LIZARDFS_STATUS_OK;
1245 }
1246 #endif // ! METARESTORE
1247
chunk_apply_modification(uint32_t ts,uint64_t oldChunkId,uint32_t lockid,uint8_t goal,bool doIncreaseVersion,uint64_t * newChunkId)1248 uint8_t chunk_apply_modification(uint32_t ts, uint64_t oldChunkId, uint32_t lockid, uint8_t goal,
1249 bool doIncreaseVersion, uint64_t *newChunkId) {
1250 Chunk *c;
1251 if (oldChunkId == 0) { // new chunk
1252 c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1253 chunk_add_file_int(c, goal);
1254 } else {
1255 Chunk *oc = chunk_find(oldChunkId);
1256 if (oc == NULL) {
1257 return LIZARDFS_ERROR_NOCHUNK;
1258 }
1259 if (oc->fileCount() == 0) { // refcount == 0
1260 lzfs_pretty_syslog(LOG_WARNING,
1261 "serious structure inconsistency: (chunkid:%016" PRIX64 ")", oldChunkId);
1262 return LIZARDFS_ERROR_CHUNKLOST; // ERROR_STRUCTURE
1263 } else if (oc->fileCount() == 1) { // refcount == 1
1264 c = oc;
1265 if (doIncreaseVersion) {
1266 c->version++;
1267 }
1268 } else {
1269 c = chunk_new(gChunksMetadata->nextchunkid++, 1);
1270 chunk_delete_file_int(oc, goal);
1271 chunk_add_file_int(c, goal);
1272 }
1273 }
1274 c->lockedto = ts + LOCKTIMEOUT;
1275 c->lockid = lockid;
1276 chunk_update_checksum(c);
1277 *newChunkId = c->chunkid;
1278 return LIZARDFS_STATUS_OK;
1279 }
1280
1281 #ifndef METARESTORE
chunk_repair(uint8_t goal,uint64_t ochunkid,uint32_t * nversion,uint8_t correct_only)1282 int chunk_repair(uint8_t goal, uint64_t ochunkid, uint32_t *nversion, uint8_t correct_only) {
1283 uint32_t best_version;
1284 Chunk *c;
1285
1286 *nversion=0;
1287 if (ochunkid==0) {
1288 return 0; // not changed
1289 }
1290
1291 c = chunk_find(ochunkid);
1292 if (c==NULL) { // no such chunk - erase (nchunkid already is 0 - so just return with "changed" status)
1293 if (correct_only == 1) { // don't erase if correct only flag is set
1294 return 0;
1295 } else {
1296 return 1;
1297 }
1298 }
1299 if (c->isLocked()) { // can't repair locked chunks - but if it's locked, then likely it doesn't need to be repaired
1300 return 0;
1301 }
1302
1303 // calculators will be sorted by decreasing keys, so highest version will be first.
1304 std::map<uint32_t, ChunkCopiesCalculator, std::greater<uint32_t>> calculators;
1305 best_version = 0;
1306 for (const auto &part : c->parts) {
1307 // ignore chunks which are being deleted
1308 if (part.state != ChunkPart::DEL) {
1309 ChunkCopiesCalculator &calculator = calculators[part.version];
1310 calculator.addPart(part.type, matocsserv_get_label(part.server()));
1311 }
1312 }
1313 // find best version which can be recovered
1314 // calculators are sorted by decreasing keys, so highest version will be first.
1315 for (auto &version_and_calculator : calculators) {
1316 uint32_t version = version_and_calculator.first;
1317 ChunkCopiesCalculator &calculator = version_and_calculator.second;
1318 calculator.optimize();
1319 // calculator.isRecoveryPossible() won't work below, because target goal is empty.
1320 if (calculator.getFullCopiesCount() > 0) {
1321 best_version = version;
1322 break;
1323 }
1324 }
1325 // current version is readable
1326 if (best_version == c->version) {
1327 return 0;
1328 }
1329 // didn't find sensible chunk
1330 if (best_version == 0) {
1331 if (correct_only == 1) { // don't erase if correct only flag is set
1332 return 0;
1333 } else { // otherwise erase it
1334 chunk_delete_file_int(c, goal);
1335 return 1;
1336 }
1337 }
1338 // found previous version which is readable
1339 c->version = best_version;
1340 for (auto &part : c->parts) {
1341 if (part.state == ChunkPart::INVALID && part.version==best_version) {
1342 part.state = ChunkPart::VALID;
1343 }
1344 }
1345 *nversion = best_version;
1346 c->needverincrease=1;
1347 c->updateStats();
1348 chunk_update_checksum(c);
1349 return 1;
1350 }
1351 #endif
1352
chunk_set_version(uint64_t chunkid,uint32_t version)1353 int chunk_set_version(uint64_t chunkid,uint32_t version) {
1354 Chunk *c;
1355 c = chunk_find(chunkid);
1356 if (c==NULL) {
1357 return LIZARDFS_ERROR_NOCHUNK;
1358 }
1359 c->version = version;
1360 chunk_update_checksum(c);
1361 return LIZARDFS_STATUS_OK;
1362 }
1363
chunk_increase_version(uint64_t chunkid)1364 int chunk_increase_version(uint64_t chunkid) {
1365 Chunk *c;
1366 c = chunk_find(chunkid);
1367 if (c==NULL) {
1368 return LIZARDFS_ERROR_NOCHUNK;
1369 }
1370 c->version++;
1371 chunk_update_checksum(c);
1372 return LIZARDFS_STATUS_OK;
1373 }
1374
chunk_set_next_chunkid(uint64_t nextChunkIdToBeSet)1375 uint8_t chunk_set_next_chunkid(uint64_t nextChunkIdToBeSet) {
1376 if (nextChunkIdToBeSet >= gChunksMetadata->nextchunkid) {
1377 gChunksMetadata->nextchunkid = nextChunkIdToBeSet;
1378 return LIZARDFS_STATUS_OK;
1379 } else {
1380 lzfs_pretty_syslog(LOG_WARNING,"was asked to increase the next chunk id to %" PRIu64 ", but it was"
1381 "already set to a bigger value %" PRIu64 ". Ignoring.",
1382 nextChunkIdToBeSet, gChunksMetadata->nextchunkid);
1383 return LIZARDFS_ERROR_MISMATCH;
1384 }
1385 }
1386
1387 #ifndef METARESTORE
1388
chunk_get_replication_state()1389 const ChunksReplicationState& chunk_get_replication_state() {
1390 return Chunk::allChunksReplicationState;
1391 }
1392
chunk_get_availability_state()1393 const ChunksAvailabilityState& chunk_get_availability_state() {
1394 return Chunk::allChunksAvailability;
1395 }
1396
1397 struct ChunkLocation {
ChunkLocationChunkLocation1398 ChunkLocation() : chunkType(slice_traits::standard::ChunkPartType()),
1399 chunkserver_version(0), distance(0), random(0) {
1400 }
1401 NetworkAddress address;
1402 ChunkPartType chunkType;
1403 uint32_t chunkserver_version;
1404 uint32_t distance;
1405 uint32_t random;
1406 MediaLabel label;
operator <ChunkLocation1407 bool operator<(const ChunkLocation& other) const {
1408 if (distance < other.distance) {
1409 return true;
1410 } else if (distance > other.distance) {
1411 return false;
1412 } else {
1413 return random < other.random;
1414 }
1415 }
1416 };
1417
1418 // TODO deduplicate
chunk_getversionandlocations(uint64_t chunkid,uint32_t currentIp,uint32_t & version,uint32_t maxNumberOfChunkCopies,std::vector<ChunkTypeWithAddress> & serversList)1419 int chunk_getversionandlocations(uint64_t chunkid, uint32_t currentIp, uint32_t& version,
1420 uint32_t maxNumberOfChunkCopies, std::vector<ChunkTypeWithAddress>& serversList) {
1421 Chunk *c;
1422 uint8_t cnt;
1423
1424 sassert(serversList.empty());
1425 c = chunk_find(chunkid);
1426
1427 if (c == NULL) {
1428 return LIZARDFS_ERROR_NOCHUNK;
1429 }
1430 version = c->version;
1431 cnt = 0;
1432 std::vector<ChunkLocation> chunkLocation;
1433 ChunkLocation chunkserverLocation;
1434 for (const auto &part : c->parts) {
1435 if (part.is_valid()) {
1436 if (cnt < maxNumberOfChunkCopies && matocsserv_getlocation(part.server(),
1437 &(chunkserverLocation.address.ip),
1438 &(chunkserverLocation.address.port),
1439 &(chunkserverLocation.label)) == 0) {
1440 chunkserverLocation.chunkType = part.type;
1441 chunkserverLocation.chunkserver_version = matocsserv_get_version(part.server());
1442 chunkserverLocation.distance =
1443 topology_distance(chunkserverLocation.address.ip, currentIp);
1444 // in the future prepare more sophisticated distance function
1445 chunkserverLocation.random = rnd<uint32_t>();
1446 chunkLocation.push_back(chunkserverLocation);
1447 cnt++;
1448 }
1449 }
1450 }
1451 std::sort(chunkLocation.begin(), chunkLocation.end());
1452 for (uint32_t i = 0; i < chunkLocation.size(); ++i) {
1453 const ChunkLocation& loc = chunkLocation[i];
1454 serversList.emplace_back(loc.address, loc.chunkType, loc.chunkserver_version);
1455 }
1456 return LIZARDFS_STATUS_OK;
1457 }
1458
chunk_getversionandlocations(uint64_t chunkid,uint32_t currentIp,uint32_t & version,uint32_t maxNumberOfChunkCopies,std::vector<ChunkPartWithAddressAndLabel> & serversList)1459 int chunk_getversionandlocations(uint64_t chunkid, uint32_t currentIp, uint32_t& version,
1460 uint32_t maxNumberOfChunkCopies, std::vector<ChunkPartWithAddressAndLabel>& serversList) {
1461 Chunk *c;
1462 uint8_t cnt;
1463
1464 sassert(serversList.empty());
1465 c = chunk_find(chunkid);
1466
1467 if (c == NULL) {
1468 return LIZARDFS_ERROR_NOCHUNK;
1469 }
1470 version = c->version;
1471 cnt = 0;
1472 std::vector<ChunkLocation> chunkLocation;
1473 ChunkLocation chunkserverLocation;
1474 for (const auto &part : c->parts) {
1475 if (part.is_valid()) {
1476 if (cnt < maxNumberOfChunkCopies && matocsserv_getlocation(part.server(),
1477 &(chunkserverLocation.address.ip),
1478 &(chunkserverLocation.address.port),
1479 &(chunkserverLocation.label)) == 0) {
1480 chunkserverLocation.chunkType = part.type;
1481 chunkserverLocation.distance =
1482 topology_distance(chunkserverLocation.address.ip, currentIp);
1483 // in the future prepare more sophisticated distance function
1484 chunkserverLocation.random = rnd<uint32_t>();
1485 chunkLocation.push_back(chunkserverLocation);
1486 cnt++;
1487 }
1488 }
1489 }
1490 std::sort(chunkLocation.begin(), chunkLocation.end());
1491 for (uint32_t i = 0; i < chunkLocation.size(); ++i) {
1492 const ChunkLocation& loc = chunkLocation[i];
1493 serversList.emplace_back(loc.address, static_cast<std::string>(loc.label), loc.chunkType);
1494 }
1495 return LIZARDFS_STATUS_OK;
1496 }
1497
chunk_server_has_chunk(matocsserventry * ptr,uint64_t chunkid,uint32_t version,ChunkPartType chunkType)1498 void chunk_server_has_chunk(matocsserventry *ptr, uint64_t chunkid, uint32_t version, ChunkPartType chunkType) {
1499 Chunk *c;
1500 const uint32_t new_version = version & 0x7FFFFFFF;
1501 const bool todel = version & 0x80000000;
1502 c = chunk_find(chunkid);
1503 if (c==NULL) {
1504 // chunkserver has nonexistent chunk, so create it for future deletion
1505 if (chunkid>=gChunksMetadata->nextchunkid) {
1506 fs_set_nextchunkid(FsContext::getForMaster(eventloop_time()), chunkid + 1);
1507 }
1508 c = chunk_new(chunkid, new_version);
1509 c->lockedto = (uint32_t)eventloop_time()+UNUSED_DELETE_TIMEOUT;
1510 c->lockid = 0;
1511 chunk_update_checksum(c);
1512 }
1513 auto server_csid = matocsserv_get_csdb(ptr)->csid;
1514 for (auto &part : c->parts) {
1515 if (part.csid == server_csid && part.type == chunkType) {
1516 // This server already notified us about its copy.
1517 // We normally don't get repeated notifications about the same copy, but
1518 // they can arrive after chunkserver configuration reload (particularly,
1519 // when folders change their 'to delete' status) or due to bugs.
1520 // Let's try to handle them as well as we can.
1521 switch (part.state) {
1522 case ChunkPart::DEL:
1523 // We requested deletion, but the chunkserver 'has' this copy again.
1524 // Repeat deletion request.
1525 c->invalidateCopy(part);
1526 // fallthrough
1527 case ChunkPart::INVALID:
1528 // leave this copy alone
1529 return;
1530 default:
1531 break;
1532 }
1533 if (part.version != new_version) {
1534 lzfs_pretty_syslog(LOG_WARNING, "chunk %016" PRIX64 ": master data indicated "
1535 "version %08" PRIX32 ", chunkserver reports %08"
1536 PRIX32 "!!! Updating master data.", c->chunkid,
1537 part.version, new_version);
1538 part.version = new_version;
1539 }
1540 if (part.version != c->version) {
1541 c->markCopyAsHavingWrongVersion(part);
1542 return;
1543 }
1544 if (!part.is_todel() && todel) {
1545 part.mark_todel();
1546 c->updateStats();
1547 }
1548 if (part.is_todel() && !todel) {
1549 part.unmark_todel();
1550 c->updateStats();
1551 }
1552 return;
1553 }
1554 }
1555 const uint8_t state = (new_version == c->version) ? (todel ? ChunkPart::TDVALID : ChunkPart::VALID) : ChunkPart::INVALID;
1556 c->parts.push_back(ChunkPart(server_csid, state, new_version, chunkType));
1557 c->updateStats();
1558 }
1559
chunk_damaged(matocsserventry * ptr,uint64_t chunkid,ChunkPartType chunk_type)1560 void chunk_damaged(matocsserventry *ptr, uint64_t chunkid, ChunkPartType chunk_type) {
1561 Chunk *c;
1562 c = chunk_find(chunkid);
1563 if (c == NULL) {
1564 // syslog(LOG_WARNING,"chunkserver has nonexistent chunk (%016" PRIX64 "), so create it for future deletion",chunkid);
1565 if (chunkid >= gChunksMetadata->nextchunkid) {
1566 gChunksMetadata->nextchunkid = chunkid + 1;
1567 }
1568 c = chunk_new(chunkid, 0);
1569 }
1570 auto server_csid = matocsserv_get_csdb(ptr)->csid;
1571 for (auto &part : c->parts) {
1572 if (part.csid == server_csid && part.type == chunk_type) {
1573 c->invalidateCopy(part);
1574 c->needverincrease=1;
1575 return;
1576 }
1577 }
1578 c->parts.push_back(ChunkPart(server_csid, ChunkPart::INVALID, 0, slice_traits::standard::ChunkPartType()));
1579 c->updateStats();
1580 c->needverincrease=1;
1581 }
1582
chunk_lost(matocsserventry * ptr,uint64_t chunkid,ChunkPartType chunk_type)1583 void chunk_lost(matocsserventry *ptr,uint64_t chunkid, ChunkPartType chunk_type) {
1584 Chunk *c = chunk_find(chunkid);
1585 if (c == nullptr) {
1586 return;
1587 }
1588 auto server_csid = matocsserv_get_csdb(ptr)->csid;
1589 auto it = std::remove_if(c->parts.begin(), c->parts.end(), [server_csid, chunk_type](const ChunkPart &part) {
1590 return part.csid == server_csid && part.type == chunk_type;
1591 });
1592 if (it != c->parts.end()) {
1593 c->parts.erase(it, c->parts.end());
1594 c->updateStats();
1595 c->needverincrease = 1;
1596 }
1597 }
1598
chunk_server_disconnected(matocsserventry *,const MediaLabel & label)1599 void chunk_server_disconnected(matocsserventry */*ptr*/, const MediaLabel &label) {
1600 replicationDelayInfoForAll.serverDisconnected();
1601 if (label != MediaLabel::kWildcard) {
1602 replicationDelayInfoForLabel[label].serverDisconnected();
1603 }
1604 // If chunkserver disconnects, we can assure it was processed by zombie server loop
1605 // only if the loop was executed at least twice.
1606 gDisconnectedCounter = 2;
1607 eventloop_make_next_poll_nonblocking();
1608 fs_cs_disconnected();
1609 gChunksMetadata->lastchunkid = 0;
1610 gChunksMetadata->lastchunkptr = NULL;
1611 }
1612
chunk_server_unlabelled_connected()1613 void chunk_server_unlabelled_connected() {
1614 replicationDelayInfoForAll.serverConnected();
1615 }
1616
chunk_server_label_changed(const MediaLabel & previousLabel,const MediaLabel & newLabel)1617 void chunk_server_label_changed(const MediaLabel &previousLabel, const MediaLabel &newLabel) {
1618 /*
1619 * Only server with no label can be considered as newly connected
1620 * and it was added to replicationDelayInfoForAll earlier
1621 * in chunk_server_unlabelled_connected call.
1622 */
1623 if (previousLabel == MediaLabel::kWildcard) {
1624 replicationDelayInfoForLabel[newLabel].serverConnected();
1625 }
1626 }
1627
1628 /*
1629 * A function that is called in every main loop iteration, that cleans chunk structs
1630 */
chunk_clean_zombie_servers_a_bit()1631 void chunk_clean_zombie_servers_a_bit() {
1632 SignalLoopWatchdog watchdog;
1633 static uint32_t current_position = HASHSIZE;
1634
1635 if (gDisconnectedCounter == 0) {
1636 return;
1637 }
1638
1639 watchdog.start();
1640 while (current_position < HASHSIZE) {
1641 for (; gCurrentChunkInZombieLoop; gCurrentChunkInZombieLoop = gCurrentChunkInZombieLoop->next) {
1642 chunk_handle_disconnected_copies(gCurrentChunkInZombieLoop);
1643 if (watchdog.expired()) {
1644 eventloop_make_next_poll_nonblocking();
1645 return;
1646 }
1647 }
1648 ++current_position;
1649 if (current_position < HASHSIZE) {
1650 gCurrentChunkInZombieLoop = gChunksMetadata->chunkhash[current_position];
1651 }
1652 }
1653 if (current_position >= HASHSIZE) {
1654 --gDisconnectedCounter;
1655 current_position = 0;
1656 gCurrentChunkInZombieLoop = gChunksMetadata->chunkhash[0];
1657 }
1658 eventloop_make_next_poll_nonblocking();
1659 }
1660
chunk_got_delete_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t)1661 void chunk_got_delete_status(matocsserventry *ptr, uint64_t chunkId, ChunkPartType chunkType, uint8_t /*status*/) {
1662 Chunk *c = chunk_find(chunkId);
1663 if (c==NULL) {
1664 return ;
1665 }
1666 auto server_csid = matocsserv_get_csdb(ptr)->csid;
1667 auto it = std::remove_if(c->parts.begin(), c->parts.end(), [server_csid, chunkType](const ChunkPart& part) {
1668 if (part.csid == server_csid && part.type == chunkType) {
1669 if (part.state != ChunkPart::DEL) {
1670 lzfs_pretty_syslog(LOG_WARNING, "got unexpected delete status");
1671 }
1672 return true;
1673 }
1674 return false;
1675 });
1676 if (it != c->parts.end()) {
1677 c->parts.erase(it, c->parts.end());
1678 c->updateStats();
1679 }
1680 }
1681
chunk_got_replicate_status(matocsserventry * ptr,uint64_t chunkId,uint32_t chunkVersion,ChunkPartType chunkType,uint8_t status)1682 void chunk_got_replicate_status(matocsserventry *ptr, uint64_t chunkId, uint32_t chunkVersion,
1683 ChunkPartType chunkType, uint8_t status) {
1684 Chunk *c = chunk_find(chunkId);
1685 if (c == NULL || status != 0) {
1686 return;
1687 }
1688
1689 auto server_csid = matocsserv_get_csdb(ptr)->csid;
1690 for (auto &part : c->parts) {
1691 if (part.type == chunkType && part.csid == server_csid) {
1692 lzfs_pretty_syslog(LOG_WARNING,
1693 "got replication status from server which had had that chunk before (chunk:%016"
1694 PRIX64 "_%08" PRIX32 ")", chunkId, chunkVersion);
1695 if (part.state == ChunkPart::VALID && chunkVersion != c->version) {
1696 part.version = chunkVersion;
1697 c->markCopyAsHavingWrongVersion(part);
1698 }
1699 return;
1700 }
1701 }
1702 const uint8_t state = (c->isLocked() || chunkVersion != c->version) ? ChunkPart::INVALID : ChunkPart::VALID;
1703 c->parts.push_back(ChunkPart(server_csid, state, chunkVersion, chunkType));
1704 c->updateStats();
1705 }
1706
chunk_operation_status(Chunk * c,ChunkPartType chunkType,uint8_t status,matocsserventry * ptr)1707 void chunk_operation_status(Chunk *c, ChunkPartType chunkType, uint8_t status,matocsserventry *ptr) {
1708 bool any_copy_busy = false;
1709 auto server_csid = matocsserv_get_csdb(ptr)->csid;
1710 for (auto &part : c->parts) {
1711 if (part.csid == server_csid && part.type == chunkType) {
1712 if (status!=0) {
1713 c->interrupted = 1; // increase version after finish, just in case
1714 c->invalidateCopy(part);
1715 } else {
1716 if (part.is_busy()) {
1717 part.unmark_busy();
1718 }
1719 }
1720 }
1721 any_copy_busy |= part.is_busy();
1722 }
1723 if (!any_copy_busy) {
1724 if (c->isWritable()) {
1725 if (c->interrupted) {
1726 chunk_emergency_increase_version(c);
1727 } else {
1728 matoclserv_chunk_status(c->chunkid,LIZARDFS_STATUS_OK);
1729 c->operation = Chunk::NONE;
1730 c->needverincrease = 0;
1731 }
1732 } else {
1733 matoclserv_chunk_status(c->chunkid,LIZARDFS_ERROR_NOTDONE);
1734 c->operation = Chunk::NONE;
1735 }
1736 }
1737 }
1738
chunk_got_create_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t status)1739 void chunk_got_create_status(matocsserventry *ptr,uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
1740 Chunk *c;
1741 c = chunk_find(chunkId);
1742 if (c==NULL) {
1743 return ;
1744 }
1745 chunk_operation_status(c, chunkType, status, ptr);
1746 }
1747
chunk_got_duplicate_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t status)1748 void chunk_got_duplicate_status(matocsserventry *ptr, uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
1749 Chunk *c;
1750 c = chunk_find(chunkId);
1751 if (c==NULL) {
1752 return ;
1753 }
1754 chunk_operation_status(c, chunkType, status, ptr);
1755 }
1756
chunk_got_setversion_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t status)1757 void chunk_got_setversion_status(matocsserventry *ptr, uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
1758 Chunk *c;
1759 c = chunk_find(chunkId);
1760 if (c==NULL) {
1761 return ;
1762 }
1763 chunk_operation_status(c, chunkType, status, ptr);
1764 }
1765
chunk_got_truncate_status(matocsserventry * ptr,uint64_t chunkid,ChunkPartType chunkType,uint8_t status)1766 void chunk_got_truncate_status(matocsserventry *ptr, uint64_t chunkid, ChunkPartType chunkType, uint8_t status) {
1767 Chunk *c;
1768 c = chunk_find(chunkid);
1769 if (c==NULL) {
1770 return ;
1771 }
1772 chunk_operation_status(c, chunkType, status, ptr);
1773 }
1774
chunk_got_duptrunc_status(matocsserventry * ptr,uint64_t chunkId,ChunkPartType chunkType,uint8_t status)1775 void chunk_got_duptrunc_status(matocsserventry *ptr, uint64_t chunkId, ChunkPartType chunkType, uint8_t status) {
1776 Chunk *c;
1777 c = chunk_find(chunkId);
1778 if (c==NULL) {
1779 return ;
1780 }
1781 chunk_operation_status(c, chunkType, status, ptr);
1782 }
1783
1784 /* ----------------------- */
1785 /* JOBS (DELETE/REPLICATE) */
1786 /* ----------------------- */
1787
chunk_store_info(uint8_t * buff)1788 void chunk_store_info(uint8_t *buff) {
1789 put32bit(&buff,chunksinfo_loopstart);
1790 put32bit(&buff,chunksinfo_loopend);
1791 put32bit(&buff,chunksinfo.done.del_invalid);
1792 put32bit(&buff,chunksinfo.notdone.del_invalid);
1793 put32bit(&buff,chunksinfo.done.del_unused);
1794 put32bit(&buff,chunksinfo.notdone.del_unused);
1795 put32bit(&buff,chunksinfo.done.del_diskclean);
1796 put32bit(&buff,chunksinfo.notdone.del_diskclean);
1797 put32bit(&buff,chunksinfo.done.del_overgoal);
1798 put32bit(&buff,chunksinfo.notdone.del_overgoal);
1799 put32bit(&buff,chunksinfo.done.copy_undergoal);
1800 put32bit(&buff,chunksinfo.notdone.copy_undergoal);
1801 put32bit(&buff,chunksinfo.copy_rebalance);
1802 }
1803
1804 //jobs state: jobshpos
1805
1806 class ChunkWorker : public coroutine {
1807 public:
1808 ChunkWorker();
1809 void doEveryLoopTasks();
1810 void doEverySecondTasks();
1811 void doChunkJobs(Chunk *c, uint16_t serverCount);
1812 void mainLoop();
1813
1814 private:
1815 typedef std::vector<ServerWithUsage> ServersWithUsage;
1816
1817 struct MainLoopStack {
1818 uint32_t current_bucket;
1819 uint16_t usable_server_count;
1820 uint32_t chunks_done_count;
1821 uint32_t buckets_done_count;
1822 std::size_t endangered_to_serve;
1823 Chunk* node;
1824 Chunk* prev;
1825 ActiveLoopWatchdog work_limit;
1826 ActiveLoopWatchdog watchdog;
1827 };
1828
1829 bool deleteUnusedChunks();
1830
1831 uint32_t getMinChunkserverVersion(Chunk *c, ChunkPartType type);
1832 bool tryReplication(Chunk *c, ChunkPartType type, matocsserventry *destinationServer);
1833
1834 void deleteInvalidChunkParts(Chunk *c);
1835 void deleteAllChunkParts(Chunk *c);
1836 bool replicateChunkPart(Chunk *c, Goal::Slice::Type slice_type, int slice_part, ChunkCopiesCalculator& calc, const IpCounter &ip_counter);
1837 bool removeUnneededChunkPart(Chunk *c, Goal::Slice::Type slice_type, int slice_part,
1838 ChunkCopiesCalculator& calc, const IpCounter &ip_counter);
1839 bool rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator& calc, bool only_todel, const IpCounter &ip_counter);
1840 bool rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator &calc, const IpCounter &ip_counter);
1841
1842 loop_info inforec_;
1843 uint32_t deleteNotDone_;
1844 uint32_t deleteDone_;
1845 uint32_t prevToDeleteCount_;
1846 uint32_t deleteLoopCount_;
1847
1848 /// All chunkservers sorted by disk usage.
1849 ServersWithUsage sortedServers_;
1850
1851 /// For each label, all servers with this label sorted by disk usage.
1852 std::map<MediaLabel, ServersWithUsage> labeledSortedServers_;
1853
1854 MainLoopStack stack_;
1855 };
1856
ChunkWorker()1857 ChunkWorker::ChunkWorker()
1858 : deleteNotDone_(0),
1859 deleteDone_(0),
1860 prevToDeleteCount_(0),
1861 deleteLoopCount_(0) {
1862 memset(&inforec_,0,sizeof(loop_info));
1863 stack_.current_bucket = 0;
1864 }
1865
doEveryLoopTasks()1866 void ChunkWorker::doEveryLoopTasks() {
1867 deleteLoopCount_++;
1868 if (deleteLoopCount_ >= 16) {
1869 uint32_t toDeleteCount = deleteDone_ + deleteNotDone_;
1870 deleteLoopCount_ = 0;
1871 if ((deleteNotDone_ > deleteDone_) && (toDeleteCount > prevToDeleteCount_)) {
1872 TmpMaxDelFrac *= 1.5;
1873 if (TmpMaxDelFrac>MaxDelHardLimit) {
1874 lzfs_pretty_syslog(LOG_NOTICE,"DEL_LIMIT hard limit (%" PRIu32 " per server) reached",MaxDelHardLimit);
1875 TmpMaxDelFrac=MaxDelHardLimit;
1876 }
1877 TmpMaxDel = TmpMaxDelFrac;
1878 lzfs_pretty_syslog(LOG_NOTICE,"DEL_LIMIT temporary increased to: %" PRIu32 " per server",TmpMaxDel);
1879 }
1880 if ((toDeleteCount < prevToDeleteCount_) && (TmpMaxDelFrac > MaxDelSoftLimit)) {
1881 TmpMaxDelFrac /= 1.5;
1882 if (TmpMaxDelFrac<MaxDelSoftLimit) {
1883 lzfs_pretty_syslog(LOG_NOTICE,"DEL_LIMIT back to soft limit (%" PRIu32 " per server)",MaxDelSoftLimit);
1884 TmpMaxDelFrac = MaxDelSoftLimit;
1885 }
1886 TmpMaxDel = TmpMaxDelFrac;
1887 lzfs_pretty_syslog(LOG_NOTICE,"DEL_LIMIT decreased back to: %" PRIu32 " per server",TmpMaxDel);
1888 }
1889 prevToDeleteCount_ = toDeleteCount;
1890 deleteNotDone_ = 0;
1891 deleteDone_ = 0;
1892 }
1893 chunksinfo = inforec_;
1894 memset(&inforec_,0,sizeof(inforec_));
1895 chunksinfo_loopstart = chunksinfo_loopend;
1896 chunksinfo_loopend = eventloop_time();
1897 }
1898
doEverySecondTasks()1899 void ChunkWorker::doEverySecondTasks() {
1900 sortedServers_ = matocsserv_getservers_sorted();
1901 labeledSortedServers_.clear();
1902 for (const ServerWithUsage& sw : sortedServers_) {
1903 labeledSortedServers_[sw.label].push_back(sw);
1904 }
1905 }
1906
chunkPresentOnServer(Chunk * c,matocsserventry * server)1907 static bool chunkPresentOnServer(Chunk *c, matocsserventry *server) {
1908 auto server_csid = matocsserv_get_csdb(server)->csid;
1909 return std::any_of(c->parts.begin(), c->parts.end(), [server_csid](const ChunkPart &part) {
1910 return part.csid == server_csid;
1911 });
1912 }
1913
chunkPresentOnServer(Chunk * c,Goal::Slice::Type slice_type,matocsserventry * server)1914 static bool chunkPresentOnServer(Chunk *c, Goal::Slice::Type slice_type, matocsserventry *server) {
1915 auto server_csid = matocsserv_get_csdb(server)->csid;
1916 return std::any_of(c->parts.begin(), c->parts.end(), [server_csid, slice_type](const ChunkPart &part) {
1917 return part.csid == server_csid && part.type.getSliceType() == slice_type;
1918 });
1919 }
1920
getMinChunkserverVersion(Chunk *,ChunkPartType)1921 uint32_t ChunkWorker::getMinChunkserverVersion(Chunk */*c*/, ChunkPartType /*type*/) {
1922 return kFirstECVersion;
1923 }
1924
tryReplication(Chunk * c,ChunkPartType part_to_recover,matocsserventry * destination_server)1925 bool ChunkWorker::tryReplication(Chunk *c, ChunkPartType part_to_recover,
1926 matocsserventry *destination_server) {
1927 // TODO(msulikowski) Prefer VALID over TDVALID copies.
1928 std::vector<matocsserventry *> standard_servers;
1929 std::vector<matocsserventry *> all_servers;
1930 std::vector<ChunkPartType> all_parts;
1931 ChunkCopiesCalculator calc(c->getGoal());
1932
1933 uint32_t destination_version = matocsserv_get_version(destination_server);
1934
1935 assert(destination_version >= getMinChunkserverVersion(c, part_to_recover));
1936 for (const auto &part : c->parts) {
1937 if (!part.is_valid() || part.is_busy() || matocsserv_replication_read_counter(part.server()) >= MaxReadRepl) {
1938 continue;
1939 }
1940
1941 if (slice_traits::isStandard(part.type)) {
1942 standard_servers.push_back(part.server());
1943 }
1944
1945 if (destination_version >= kFirstXorVersion && destination_version < kFirstECVersion
1946 && slice_traits::isXor(part_to_recover) && matocsserv_get_version(part.server()) < kFirstXorVersion) {
1947 continue;
1948 }
1949
1950 if (destination_version < kFirstXorVersion && !slice_traits::isStandard(part.type)) {
1951 continue;
1952 }
1953
1954 all_servers.push_back(part.server());
1955 all_parts.push_back(part.type);
1956 calc.addPart(part.type, matocsserv_get_label(part.server()));
1957 }
1958
1959 calc.evalRedundancyLevel();
1960 if (!calc.isRecoveryPossible()) {
1961 return false;
1962 }
1963
1964 if (destination_version >= kFirstECVersion ||
1965 (destination_version >= kFirstXorVersion && slice_traits::isXor(part_to_recover))) {
1966 matocsserv_send_liz_replicatechunk(destination_server, c->chunkid, c->version,
1967 part_to_recover, all_servers,
1968 all_parts);
1969 stats_replications++;
1970 c->needverincrease = 1;
1971 return true;
1972 }
1973
1974 // fall back to legacy replication
1975 assert(slice_traits::isStandard(part_to_recover));
1976
1977 if (standard_servers.empty()) {
1978 return false;
1979 }
1980
1981 matocsserv_send_replicatechunk(destination_server, c->chunkid, c->version,
1982 standard_servers[rnd_ranged<uint32_t>(standard_servers.size())]);
1983
1984 stats_replications++;
1985 c->needverincrease = 1;
1986 return true;
1987 }
1988
deleteInvalidChunkParts(Chunk * c)1989 void ChunkWorker::deleteInvalidChunkParts(Chunk *c) {
1990 for (auto &part : c->parts) {
1991 if (matocsserv_deletion_counter(part.server()) < TmpMaxDel) {
1992 if (!part.is_valid()) {
1993 if (part.state == ChunkPart::DEL) {
1994 lzfs_pretty_syslog(LOG_WARNING,
1995 "chunk hasn't been deleted since previous loop - "
1996 "retry");
1997 }
1998 part.state = ChunkPart::DEL;
1999 stats_deletions++;
2000 matocsserv_send_deletechunk(part.server(), c->chunkid, 0, part.type);
2001 inforec_.done.del_invalid++;
2002 deleteDone_++;
2003 }
2004 } else {
2005 if (part.state == ChunkPart::INVALID) {
2006 inforec_.notdone.del_invalid++;
2007 deleteNotDone_++;
2008 }
2009 }
2010 }
2011 }
2012
deleteAllChunkParts(Chunk * c)2013 void ChunkWorker::deleteAllChunkParts(Chunk *c) {
2014 for (auto &part : c->parts) {
2015 if (matocsserv_deletion_counter(part.server()) < TmpMaxDel) {
2016 if (part.is_valid() && !part.is_busy()) {
2017 c->deleteCopy(part);
2018 c->needverincrease = 1;
2019 stats_deletions++;
2020 matocsserv_send_deletechunk(part.server(), c->chunkid, c->version,
2021 part.type);
2022 inforec_.done.del_unused++;
2023 deleteDone_++;
2024 }
2025 } else {
2026 if (part.state == ChunkPart::VALID || part.state == ChunkPart::TDVALID) {
2027 inforec_.notdone.del_unused++;
2028 deleteNotDone_++;
2029 }
2030 }
2031 }
2032 }
2033
replicateChunkPart(Chunk * c,Goal::Slice::Type slice_type,int slice_part,ChunkCopiesCalculator & calc,const IpCounter & ip_counter)2034 bool ChunkWorker::replicateChunkPart(Chunk *c, Goal::Slice::Type slice_type, int slice_part,
2035 ChunkCopiesCalculator &calc, const IpCounter &ip_counter) {
2036 std::vector<matocsserventry *> servers;
2037 int skipped_replications = 0, valid_parts_count = 0, expected_copies = 0;
2038 bool tried_to_replicate = false;
2039 Goal::Slice::Labels replicate_labels;
2040
2041 replicate_labels = calc.getLabelsToRecover(slice_type, slice_part);
2042
2043 if (calc.getAvailable().find(slice_type) != calc.getAvailable().end()) {
2044 valid_parts_count =
2045 Goal::Slice::countLabels(calc.getAvailable()[slice_type][slice_part]);
2046 }
2047
2048 expected_copies = Goal::Slice::countLabels(calc.getTarget()[slice_type][slice_part]);
2049
2050 uint32_t min_chunkserver_version = getMinChunkserverVersion(c, ChunkPartType(slice_type, slice_part));
2051
2052 for (const auto &label_and_count : replicate_labels) {
2053 tried_to_replicate = true;
2054
2055 if (jobsnorepbefore >= eventloop_time()) {
2056 break;
2057 }
2058
2059 if (label_and_count.first == MediaLabel::kWildcard) {
2060 if (!replicationDelayInfoForAll.replicationAllowed(
2061 label_and_count.second)) {
2062 continue;
2063 }
2064 } else if (!replicationDelayInfoForLabel[label_and_count.first].replicationAllowed(
2065 label_and_count.second)) {
2066 skipped_replications += label_and_count.second;
2067 continue;
2068 }
2069
2070 // Get a list of possible destination servers
2071 int total_matching, returned_matching, temporarily_unavailable;
2072 matocsserv_getservers_lessrepl(label_and_count.first, min_chunkserver_version, MaxWriteRepl,
2073 ip_counter, servers, total_matching, returned_matching,
2074 temporarily_unavailable);
2075
2076 // Find a destination server for replication -- the first one without a copy of 'c'
2077 matocsserventry *destination = nullptr;
2078 matocsserventry *backup_destination = nullptr;
2079 for (const auto &server : servers) {
2080 if (!chunkPresentOnServer(c, server)) {
2081 destination = server;
2082 break;
2083 }
2084 if (backup_destination == nullptr && !chunkPresentOnServer(c, slice_type, server)) {
2085 backup_destination = server;
2086 }
2087 }
2088
2089 // If destination was not found, use a backup one, i.e. server where
2090 // there is a copy of this chunk, but from different slice.
2091 // Do it only if there are no available chunkservers in the system,
2092 // not if they merely reached their replication limit.
2093 if (destination == nullptr && temporarily_unavailable == 0) {
2094 // there are no servers which are expected to be available soon,
2095 // so backup server must be used
2096 destination = backup_destination;
2097 }
2098
2099 if (destination == nullptr) {
2100 // there is no server suitable for replication to be written to
2101 break;
2102 }
2103
2104 if (!(label_and_count.first == MediaLabel::kWildcard ||
2105 matocsserv_get_label(destination) == label_and_count.first)) {
2106 // found server doesn't match requested label
2107 if (total_matching > returned_matching) {
2108 // There is a server which matches the current label, but it has
2109 // exceeded the
2110 // replication limit. In this case we won't try to use servers with
2111 // non-matching
2112 // labels as our destination -- we will wait for that server to be
2113 // ready.
2114 skipped_replications += label_and_count.second;
2115 continue;
2116 }
2117 if (!RebalancingBetweenLabels && !c->isEndangered()
2118 && calc.isSafeEnoughToWrite(gRedundancyLevel)) {
2119 // Chunk is not endangered, so we should prevent label spilling.
2120 // Only endangered chunks will be replicated across labels.
2121 skipped_replications += label_and_count.second;
2122 continue;
2123 }
2124 if (valid_parts_count + skipped_replications >= expected_copies) {
2125 // Don't create copies on non-matching servers if there already are
2126 // enough replicas.
2127 continue;
2128 }
2129 }
2130
2131 if (tryReplication(c, ChunkPartType(slice_type, slice_part), destination)) {
2132 inforec_.done.copy_undergoal++;
2133 return true;
2134 } else {
2135 // There is no server suitable for replication to be read from
2136 skipped_replications += label_and_count.second;
2137 break; // there's no need to analyze other labels if there's no free source
2138 // server
2139 }
2140 }
2141 if (tried_to_replicate) {
2142 inforec_.notdone.copy_undergoal++;
2143 // Enqueue chunk again only if it was taken directly from endangered chunks queue
2144 // to avoid repetitions. If it was taken from chunk hashmap, inEndangeredQueue bit
2145 // would be still up.
2146 if (gEndangeredChunksServingLimit > 0 && Chunk::endangeredChunks.size() < gEndangeredChunksMaxCapacity
2147 && !c->inEndangeredQueue && calc.getState() == ChunksAvailabilityState::kEndangered) {
2148 c->inEndangeredQueue = 1;
2149 Chunk::endangeredChunks.push_back(c);
2150 }
2151 }
2152
2153 return false;
2154 }
2155
removeUnneededChunkPart(Chunk * c,Goal::Slice::Type slice_type,int slice_part,ChunkCopiesCalculator & calc,const IpCounter & ip_counter)2156 bool ChunkWorker::removeUnneededChunkPart(Chunk *c, Goal::Slice::Type slice_type, int slice_part,
2157 ChunkCopiesCalculator &calc, const IpCounter &ip_counter) {
2158 Goal::Slice::Labels remove_pool = calc.getRemovePool(slice_type, slice_part);
2159 if (remove_pool.empty()) {
2160 return false;
2161 }
2162
2163 ChunkPart *candidate = nullptr;
2164 bool candidate_todel = false;
2165 int candidate_occurrence = 0;
2166 double candidate_usage = std::numeric_limits<double>::lowest();
2167
2168 for (auto &part : c->parts) {
2169 if (!part.is_valid() || part.type != ChunkPartType(slice_type, slice_part)) {
2170 continue;
2171 }
2172 if (matocsserv_deletion_counter(part.server()) >= TmpMaxDel) {
2173 continue;
2174 }
2175
2176 MediaLabel server_label = matocsserv_get_label(part.server());
2177 if (remove_pool.find(server_label) == remove_pool.end()) {
2178 continue;
2179 }
2180
2181 bool is_todel = part.is_todel();
2182 double usage = matocsserv_get_usage(part.server());
2183 int occurrence = ip_counter.empty() ? 1 : ip_counter.at(matocsserv_get_servip(part.server()));
2184
2185 if (std::make_tuple(is_todel, occurrence, usage) >
2186 std::make_tuple(candidate_todel, candidate_occurrence, candidate_usage)) {
2187 candidate = ∂
2188 candidate_usage = usage;
2189 candidate_todel = is_todel;
2190 candidate_occurrence = occurrence;
2191 }
2192 }
2193
2194 if (candidate &&
2195 calc.canRemovePart(slice_type, slice_part, matocsserv_get_label(candidate->server()))) {
2196 c->deleteCopy(*candidate);
2197 c->needverincrease = 1;
2198 stats_deletions++;
2199 matocsserv_send_deletechunk(candidate->server(), c->chunkid, 0, candidate->type);
2200
2201 int overgoal_copies = calc.countPartsToMove(slice_type, slice_part).second;
2202
2203 inforec_.done.del_overgoal++;
2204 deleteDone_++;
2205 inforec_.notdone.del_overgoal += overgoal_copies - 1;
2206 deleteNotDone_ += overgoal_copies - 1;
2207
2208 return true;
2209 }
2210
2211 return false;
2212 }
2213
rebalanceChunkParts(Chunk * c,ChunkCopiesCalculator & calc,bool only_todel,const IpCounter & ip_counter)2214 bool ChunkWorker::rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator &calc, bool only_todel, const IpCounter &ip_counter) {
2215 if(!only_todel) {
2216 double min_usage = sortedServers_.front().disk_usage;
2217 double max_usage = sortedServers_.back().disk_usage;
2218 if ((max_usage - min_usage) <= gAcceptableDifference) {
2219 return false;
2220 }
2221 }
2222
2223 // Consider each copy to be moved to a server with disk usage much less than actual.
2224 // There are at least two servers with a disk usage difference grater than
2225 // gAcceptableDifference, so it's worth checking.
2226 for (const auto &part : c->parts) {
2227 if (!part.is_valid()) {
2228 continue;
2229 }
2230
2231 if(only_todel && !part.is_todel()) {
2232 continue;
2233 }
2234
2235 auto current_ip = matocsserv_get_servip(part.server());
2236 auto it = ip_counter.find(current_ip);
2237 auto current_ip_count = it != ip_counter.end() ? it->second : 0;
2238
2239 MediaLabel current_copy_label = matocsserv_get_label(part.server());
2240 double current_copy_disk_usage = matocsserv_get_usage(part.server());
2241 // Look for a server that has disk usage much less than currentCopyDiskUsage.
2242 // If such a server exists consider creating a new copy of this chunk there.
2243 // First, choose all possible candidates for the destination server: we consider
2244 // only
2245 // servers with the same label is rebalancing between labels if turned off or the
2246 // goal
2247 // requires our copy to exist on a server labeled 'currentCopyLabel'.
2248 bool multi_label_rebalance =
2249 RebalancingBetweenLabels &&
2250 (current_copy_label == MediaLabel::kWildcard ||
2251 calc.canMovePartToDifferentLabel(part.type.getSliceType(),
2252 part.type.getSlicePart(),
2253 current_copy_label));
2254
2255 uint32_t min_chunkserver_version = getMinChunkserverVersion(c, part.type);
2256
2257 const ServersWithUsage &sorted_servers =
2258 multi_label_rebalance ? sortedServers_
2259 : labeledSortedServers_[current_copy_label];
2260
2261 for (const auto &empty_server : sorted_servers) {
2262 if (!only_todel && gAvoidSameIpChunkservers) {
2263 auto empty_server_ip = matocsserv_get_servip(empty_server.server);
2264 auto it = ip_counter.find(empty_server_ip);
2265 auto empty_server_ip_count = it != ip_counter.end() ? it->second : 0;
2266 if (empty_server_ip != current_ip && empty_server_ip_count >= current_ip_count) {
2267 continue;
2268 }
2269 }
2270
2271 if (!only_todel && empty_server.disk_usage >
2272 current_copy_disk_usage - gAcceptableDifference) {
2273 break; // No more suitable destination servers (next servers have
2274 // higher usage)
2275 }
2276 if (matocsserv_get_version(empty_server.server) < min_chunkserver_version) {
2277 continue;
2278 }
2279 if (chunkPresentOnServer(c, part.type.getSliceType(), empty_server.server)) {
2280 continue; // A copy is already here
2281 }
2282 if (matocsserv_replication_write_counter(empty_server.server) >= MaxWriteRepl) {
2283 continue; // We can't create a new copy here
2284 }
2285 if (tryReplication(c, part.type, empty_server.server)) {
2286 inforec_.copy_rebalance++;
2287 return true;
2288 }
2289 }
2290 }
2291
2292 return false;
2293 }
2294
rebalanceChunkPartsWithSameIp(Chunk * c,ChunkCopiesCalculator & calc,const IpCounter & ip_counter)2295 bool ChunkWorker::rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator &calc, const IpCounter &ip_counter) {
2296 if (!gAvoidSameIpChunkservers) {
2297 return false;
2298 }
2299
2300 for (const auto &part : c->parts) {
2301 if (!part.is_valid()) {
2302 continue;
2303 }
2304
2305 auto current_ip = matocsserv_get_servip(part.server());
2306 auto it = ip_counter.find(current_ip);
2307 auto current_ip_count = it != ip_counter.end() ? it->second : 0;
2308
2309 MediaLabel current_copy_label = matocsserv_get_label(part.server());
2310
2311 bool multi_label_rebalance =
2312 RebalancingBetweenLabels &&
2313 (current_copy_label == MediaLabel::kWildcard ||
2314 calc.canMovePartToDifferentLabel(part.type.getSliceType(),
2315 part.type.getSlicePart(),
2316 current_copy_label));
2317
2318 uint32_t min_chunkserver_version = getMinChunkserverVersion(c, part.type);
2319
2320 const ServersWithUsage &sorted_servers =
2321 multi_label_rebalance ? sortedServers_
2322 : labeledSortedServers_[current_copy_label];
2323
2324 ServersWithUsage sorted_by_ip_count;
2325 sorted_by_ip_count.resize(sorted_servers.size());
2326 counting_sort_copy(sorted_servers.begin(), sorted_servers.end(), sorted_by_ip_count.begin(),
2327 [&ip_counter](const ServerWithUsage& elem) {
2328 auto ip = matocsserv_get_servip(elem.server);
2329 auto it = ip_counter.find(ip);
2330 return it != ip_counter.end() ? it->second : 0;
2331 });
2332
2333 for (const auto &empty_server : sorted_by_ip_count) {
2334 auto empty_server_ip = matocsserv_get_servip(empty_server.server);
2335 auto it = ip_counter.find(empty_server_ip);
2336 auto empty_server_ip_count = it != ip_counter.end() ? it->second : 0;
2337 if (empty_server_ip_count >= (current_ip_count - 1)) {
2338 break;
2339 }
2340
2341 if (matocsserv_get_version(empty_server.server) < min_chunkserver_version) {
2342 continue;
2343 }
2344 if (chunkPresentOnServer(c, part.type.getSliceType(), empty_server.server)) {
2345 continue; // A copy is already here
2346 }
2347 if (matocsserv_replication_write_counter(empty_server.server) >= MaxWriteRepl) {
2348 continue; // We can't create a new copy here
2349 }
2350 if (tryReplication(c, part.type, empty_server.server)) {
2351 inforec_.copy_rebalance++;
2352 return true;
2353 }
2354 }
2355 }
2356
2357 return false;
2358 }
2359
2360
doChunkJobs(Chunk * c,uint16_t serverCount)2361 void ChunkWorker::doChunkJobs(Chunk *c, uint16_t serverCount) {
2362 // step 0. Update chunk's statistics
2363 // Useful e.g. if definitions of goals did change.
2364 chunk_handle_disconnected_copies(c);
2365 c->updateStats();
2366 if (serverCount == 0) {
2367 return;
2368 }
2369
2370 int invalid_parts = 0;
2371 ChunkCopiesCalculator calc(c->getGoal());
2372
2373 // Chunk is in degenerate state if it has more than 1 part
2374 // on the same chunkserver (i.e. 1 std and 1 xor)
2375 // TODO(sarna): this flat_set should be removed after
2376 // 'slists' are rewritten to use sensible data structures
2377 bool degenerate = false;
2378 flat_set<matocsserventry *, small_vector<matocsserventry *, 64>> servers;
2379
2380 // step 1. calculate number of valid and invalid copies
2381 for (const auto &part : c->parts) {
2382 if (part.is_valid()) {
2383 calc.addPart(part.type, matocsserv_get_label(part.server()));
2384 if (!degenerate) {
2385 degenerate = servers.count(part.server()) > 0;
2386 servers.insert(part.server());
2387 }
2388 } else {
2389 ++invalid_parts;
2390 }
2391 }
2392 calc.optimize();
2393
2394 // step 1a. count number of chunk parts on servers with the same ip
2395 IpCounter ip_occurrence;
2396 if (gAvoidSameIpChunkservers) {
2397 for (auto &part : c->parts) {
2398 if (part.is_valid()) {
2399 ++ip_occurrence[matocsserv_get_servip(part.server())];
2400 }
2401 }
2402 }
2403
2404 // step 2. check number of copies
2405 if (c->isLost() && invalid_parts > 0 && c->fileCount() > 0) {
2406 lzfs_pretty_syslog(LOG_WARNING, "chunk %016lx has not enough valid parts (%d)"
2407 " consider repairing it manually", c->chunkid, invalid_parts);
2408 for (const auto &part : c->parts) {
2409 if (!part.is_valid()) {
2410 lzfs_pretty_syslog(LOG_NOTICE, "chunk %016lx_%08x - invalid part on (%s - ver:%08x)",
2411 c->chunkid, c->version, matocsserv_getstrip(part.server()), part.version);
2412 }
2413 }
2414 return;
2415 }
2416
2417 // step 3. delete invalid parts
2418 deleteInvalidChunkParts(c);
2419
2420 // step 4. return if chunk is during some operation
2421 if (c->operation != Chunk::NONE || (c->isLocked())) {
2422 return;
2423 }
2424
2425 // step 5. check busy count
2426 for (const auto &part : c->parts) {
2427 if (part.is_busy()) {
2428 lzfs_pretty_syslog(LOG_WARNING, "chunk %016" PRIX64 " has unexpected BUSY copies",
2429 c->chunkid);
2430 return;
2431 }
2432 }
2433
2434 // step 6. delete unused chunk
2435 if (c->fileCount() == 0) {
2436 deleteAllChunkParts(c);
2437 return;
2438 }
2439
2440 if (c->isLost()) {
2441 return;
2442 }
2443
2444 // step 7. check if chunk needs any replication
2445 for (const auto &slice : calc.getTarget()) {
2446 for (int i = 0; i < slice.size(); ++i) {
2447 if (replicateChunkPart(c, slice.getType(), i, calc, ip_occurrence)) {
2448 return;
2449 }
2450 }
2451 }
2452
2453 // Do not remove any parts if more than 1 part resides on 1 chunkserver
2454 if (degenerate && calc.countPartsToRecover() > 0) {
2455 return;
2456 }
2457
2458 // step 8. if chunk has too many copies then delete some of them
2459 for (const auto &slice : calc.getAvailable()) {
2460 for (int i = 0; i < slice.size(); ++i) {
2461 std::pair<int, int> operations = calc.countPartsToMove(slice.getType(), i);
2462 if (operations.first > 0 || operations.second == 0) {
2463 // do not remove elements if some are missing
2464 continue;
2465 }
2466
2467 if (removeUnneededChunkPart(c, slice.getType(), i, calc, ip_occurrence)) {
2468 return;
2469 }
2470 }
2471 }
2472
2473 // step 9. If chunk has parts marked as "to delete" then move them to other servers
2474 if(rebalanceChunkParts(c, calc, true, ip_occurrence)) {
2475 return;
2476 }
2477
2478 if (chunksinfo.notdone.copy_undergoal > 0 && chunksinfo.done.copy_undergoal > 0) {
2479 return;
2480 }
2481
2482 // step 10. Move chunk parts residing on chunkservers with the same ip.
2483 if (rebalanceChunkPartsWithSameIp(c, calc, ip_occurrence)) {
2484 return;
2485 }
2486
2487 // step 11. if there is too big difference between chunkservers then make copy of chunk from
2488 // a server with a high disk usage on a server with low disk usage
2489 if (rebalanceChunkParts(c, calc, false, ip_occurrence)) {
2490 return;
2491 }
2492
2493 }
2494
deleteUnusedChunks()2495 bool ChunkWorker::deleteUnusedChunks() {
2496 while (stack_.node != nullptr) {
2497 chunk_handle_disconnected_copies(stack_.node);
2498 if (stack_.node->fileCount() == 0 && stack_.node->parts.empty()) {
2499 // If current chunk in zombie loop is to be deleted, it must be updated
2500 // to the next chunk
2501 if (stack_.node == gCurrentChunkInZombieLoop) {
2502 gCurrentChunkInZombieLoop = gCurrentChunkInZombieLoop->next;
2503 }
2504 // Something could be inserted between prev and node (when we yielded)
2505 // so we need to make prev valid.
2506 while (stack_.prev && stack_.prev->next != stack_.node) {
2507 stack_.prev = stack_.prev->next;
2508 }
2509
2510 assert((!stack_.prev && gChunksMetadata->chunkhash[stack_.current_bucket] == stack_.node) ||
2511 (stack_.prev && stack_.prev->next == stack_.node));
2512
2513 if (stack_.prev) {
2514 stack_.prev->next = stack_.node->next;
2515 } else {
2516 gChunksMetadata->chunkhash[stack_.current_bucket] =
2517 stack_.node->next;
2518 }
2519
2520 Chunk *tmp = stack_.node->next;
2521 chunk_delete(stack_.node);
2522 stack_.node = tmp;
2523 } else {
2524 stack_.prev = stack_.node;
2525 stack_.node = stack_.node->next;
2526 }
2527
2528 if (stack_.watchdog.expired()) {
2529 return false;
2530 }
2531 }
2532
2533 return true;
2534 }
2535
mainLoop()2536 void ChunkWorker::mainLoop() {
2537 Chunk *c;
2538
2539 reenter(this) {
2540 stack_.work_limit.setMaxDuration(std::chrono::milliseconds(ChunksLoopTimeout));
2541 stack_.work_limit.start();
2542 stack_.watchdog.start();
2543 stack_.chunks_done_count = 0;
2544 stack_.buckets_done_count = 0;
2545
2546 if (starttime + gOperationsDelayInit > eventloop_time()) {
2547 return;
2548 }
2549
2550 double min_usage, max_usage;
2551 matocsserv_usagedifference(&min_usage, &max_usage, &stack_.usable_server_count,
2552 nullptr);
2553
2554 if (min_usage > max_usage) {
2555 return;
2556 }
2557
2558 doEverySecondTasks();
2559
2560 if (jobsnorepbefore < eventloop_time()) {
2561 stack_.endangered_to_serve = gEndangeredChunksServingLimit;
2562 while (stack_.endangered_to_serve > 0 && !Chunk::endangeredChunks.empty()) {
2563 c = Chunk::endangeredChunks.front();
2564 Chunk::endangeredChunks.pop_front();
2565 // If queued chunk is obsolete (e.g. was freed while in queue),
2566 // do not proceed with chunk jobs.
2567 if (c->inEndangeredQueue == 1) {
2568 c->inEndangeredQueue = 0;
2569 doChunkJobs(c, stack_.usable_server_count);
2570 }
2571 --stack_.endangered_to_serve;
2572
2573 if (stack_.watchdog.expired()) {
2574 yield;
2575 stack_.watchdog.start();
2576 }
2577 }
2578 }
2579
2580 while (stack_.buckets_done_count < HashSteps &&
2581 stack_.chunks_done_count < HashCPS) {
2582 if (stack_.current_bucket == 0) {
2583 doEveryLoopTasks();
2584 }
2585
2586 if (stack_.watchdog.expired()) {
2587 yield;
2588 stack_.watchdog.start();
2589 }
2590
2591 // delete unused chunks
2592 stack_.prev = nullptr;
2593 stack_.node = gChunksMetadata->chunkhash[stack_.current_bucket];
2594 while (!deleteUnusedChunks()) {
2595 yield;
2596 stack_.watchdog.start();
2597 }
2598
2599 // regenerate usable_server_count
2600 matocsserv_usagedifference(nullptr, nullptr, &stack_.usable_server_count,
2601 nullptr);
2602
2603 stack_.node = gChunksMetadata->chunkhash[stack_.current_bucket];
2604 while (stack_.node) {
2605 doChunkJobs(stack_.node, stack_.usable_server_count);
2606 ++stack_.chunks_done_count;
2607 stack_.node = stack_.node->next;
2608
2609 if (stack_.watchdog.expired()) {
2610 yield;
2611 stack_.watchdog.start();
2612 matocsserv_usagedifference(nullptr, nullptr,
2613 &stack_.usable_server_count,
2614 nullptr);
2615 }
2616 }
2617
2618 stack_.current_bucket +=
2619 123; // if HASHSIZE is any power of 2 then any odd number is
2620 // good here
2621 stack_.current_bucket %= HASHSIZE;
2622 ++stack_.buckets_done_count;
2623
2624 if (stack_.work_limit.expired()) {
2625 break;
2626 }
2627 }
2628 }
2629 }
2630
2631 static std::unique_ptr<ChunkWorker> gChunkWorker;
2632
chunk_jobs_main(void)2633 void chunk_jobs_main(void) {
2634 if (gChunkWorker->is_complete()) {
2635 gChunkWorker->reset();
2636 }
2637 }
2638
chunk_jobs_process_bit(void)2639 void chunk_jobs_process_bit(void) {
2640 if (!gChunkWorker->is_complete()) {
2641 gChunkWorker->mainLoop();
2642 if (!gChunkWorker->is_complete()) {
2643 eventloop_make_next_poll_nonblocking();
2644 }
2645 }
2646 }
2647
2648 #endif
2649
2650 constexpr uint32_t kSerializedChunkSizeNoLockId = 16;
2651 constexpr uint32_t kSerializedChunkSizeWithLockId = 20;
2652 #define CHUNKCNT 1000
2653
2654 #ifdef METARESTORE
2655
chunk_dump(void)2656 void chunk_dump(void) {
2657 Chunk *c;
2658 uint32_t i;
2659
2660 for (i=0 ; i<HASHSIZE ; i++) {
2661 for (c=gChunksMetadata->chunkhash[i] ; c ; c=c->next) {
2662 printf("*|i:%016" PRIX64 "|v:%08" PRIX32 "|g:%" PRIu8 "|t:%10" PRIu32 "\n",c->chunkid,c->version,c->highestIdGoal(),c->lockedto);
2663 }
2664 }
2665 }
2666
2667 #endif
2668
chunk_load(FILE * fd,bool loadLockIds)2669 int chunk_load(FILE *fd, bool loadLockIds) {
2670 uint8_t hdr[8];
2671 const uint8_t *ptr;
2672 int32_t r;
2673 Chunk *c;
2674 // chunkdata
2675 uint64_t chunkid;
2676
2677 if (fread(hdr,1,8,fd)!=8) {
2678 return -1;
2679 }
2680 ptr = hdr;
2681 gChunksMetadata->nextchunkid = get64bit(&ptr);
2682 int32_t serializedChunkSize = (loadLockIds
2683 ? kSerializedChunkSizeWithLockId : kSerializedChunkSizeNoLockId);
2684 std::vector<uint8_t> loadbuff(serializedChunkSize);
2685 for (;;) {
2686 r = fread(loadbuff.data(), 1, serializedChunkSize, fd);
2687 if (r != serializedChunkSize) {
2688 return -1;
2689 }
2690 ptr = loadbuff.data();
2691 chunkid = get64bit(&ptr);
2692 if (chunkid>0) {
2693 uint32_t version = get32bit(&ptr);
2694 c = chunk_new(chunkid, version);
2695 c->lockedto = get32bit(&ptr);
2696 if (loadLockIds) {
2697 c->lockid = get32bit(&ptr);
2698 }
2699 } else {
2700 uint32_t version = get32bit(&ptr);
2701 uint32_t lockedto = get32bit(&ptr);
2702 if (version==0 && lockedto==0) {
2703 return 0;
2704 } else {
2705 return -1;
2706 }
2707 }
2708 }
2709 return 0; // unreachable
2710 }
2711
chunk_store(FILE * fd)2712 void chunk_store(FILE *fd) {
2713 passert(gChunksMetadata);
2714 uint8_t hdr[8];
2715 uint8_t storebuff[kSerializedChunkSizeWithLockId * CHUNKCNT];
2716 uint8_t *ptr;
2717 uint32_t i,j;
2718 Chunk *c;
2719 // chunkdata
2720 uint64_t chunkid;
2721 uint32_t version;
2722 uint32_t lockedto, lockid;
2723 ptr = hdr;
2724 put64bit(&ptr,gChunksMetadata->nextchunkid);
2725 if (fwrite(hdr,1,8,fd)!=(size_t)8) {
2726 return;
2727 }
2728 j=0;
2729 ptr = storebuff;
2730 for (i=0 ; i<HASHSIZE ; i++) {
2731 for (c=gChunksMetadata->chunkhash[i] ; c ; c=c->next) {
2732 #ifndef METARESTORE
2733 chunk_handle_disconnected_copies(c);
2734 #endif
2735 chunkid = c->chunkid;
2736 put64bit(&ptr,chunkid);
2737 version = c->version;
2738 put32bit(&ptr,version);
2739 lockedto = c->lockedto;
2740 lockid = c->lockid;
2741 put32bit(&ptr,lockedto);
2742 put32bit(&ptr,lockid);
2743 j++;
2744 if (j==CHUNKCNT) {
2745 size_t writtenBlockSize = kSerializedChunkSizeWithLockId * CHUNKCNT;
2746 if (fwrite(storebuff, 1, writtenBlockSize, fd) != writtenBlockSize) {
2747 return;
2748 }
2749 j=0;
2750 ptr = storebuff;
2751 }
2752 }
2753 }
2754 memset(ptr, 0, kSerializedChunkSizeWithLockId);
2755 j++;
2756 size_t writtenBlockSize = kSerializedChunkSizeWithLockId * j;
2757 if (fwrite(storebuff, 1, writtenBlockSize, fd) != writtenBlockSize) {
2758 return;
2759 }
2760 }
2761
chunk_unload(void)2762 void chunk_unload(void) {
2763 delete gChunksMetadata;
2764 gChunksMetadata = nullptr;
2765 }
2766
chunk_newfs(void)2767 void chunk_newfs(void) {
2768 #ifndef METARESTORE
2769 Chunk::count = 0;
2770 #endif
2771 gChunksMetadata->nextchunkid = 1;
2772 }
2773
2774 #ifndef METARESTORE
chunk_become_master()2775 void chunk_become_master() {
2776 starttime = eventloop_time();
2777 jobsnorepbefore = starttime + gOperationsDelayInit;
2778 gChunkWorker = std::unique_ptr<ChunkWorker>(new ChunkWorker());
2779 gChunkLoopEventHandle = eventloop_timeregister_ms(ChunksLoopPeriod, chunk_jobs_main);
2780 eventloop_eachloopregister(chunk_jobs_process_bit);
2781 return;
2782 }
2783
chunk_reload(void)2784 void chunk_reload(void) {
2785 uint32_t repl;
2786 uint32_t looptime;
2787
2788 // Set deprecated values first and override them if newer definition is found
2789 gOperationsDelayInit = cfg_getuint32("REPLICATIONS_DELAY_INIT", 300);
2790 gOperationsDelayDisconnect = cfg_getuint32("REPLICATIONS_DELAY_DISCONNECT", 3600);
2791 gOperationsDelayInit = cfg_getuint32("OPERATIONS_DELAY_INIT", gOperationsDelayInit);
2792 gOperationsDelayDisconnect = cfg_getuint32("OPERATIONS_DELAY_DISCONNECT", gOperationsDelayDisconnect);
2793 gAvoidSameIpChunkservers = cfg_getuint32("AVOID_SAME_IP_CHUNKSERVERS", 0);
2794 gRedundancyLevel = cfg_getuint32("REDUNDANCY_LEVEL", 0);
2795
2796 uint32_t disableChunksDel = cfg_getuint32("DISABLE_CHUNKS_DEL", 0);
2797 if (disableChunksDel) {
2798 MaxDelSoftLimit = MaxDelHardLimit = 0;
2799 } else {
2800 uint32_t oldMaxDelSoftLimit = MaxDelSoftLimit;
2801 uint32_t oldMaxDelHardLimit = MaxDelHardLimit;
2802
2803 MaxDelSoftLimit = cfg_getuint32("CHUNKS_SOFT_DEL_LIMIT",10);
2804 if (cfg_isdefined("CHUNKS_HARD_DEL_LIMIT")) {
2805 MaxDelHardLimit = cfg_getuint32("CHUNKS_HARD_DEL_LIMIT",25);
2806 if (MaxDelHardLimit<MaxDelSoftLimit) {
2807 MaxDelSoftLimit = MaxDelHardLimit;
2808 lzfs_pretty_syslog(LOG_WARNING,"CHUNKS_SOFT_DEL_LIMIT is greater than CHUNKS_HARD_DEL_LIMIT - using CHUNKS_HARD_DEL_LIMIT for both");
2809 }
2810 } else {
2811 MaxDelHardLimit = 3 * MaxDelSoftLimit;
2812 }
2813 if (MaxDelSoftLimit==0) {
2814 MaxDelSoftLimit = oldMaxDelSoftLimit;
2815 MaxDelHardLimit = oldMaxDelHardLimit;
2816 }
2817 }
2818 if (TmpMaxDelFrac<MaxDelSoftLimit) {
2819 TmpMaxDelFrac = MaxDelSoftLimit;
2820 }
2821 if (TmpMaxDelFrac>MaxDelHardLimit) {
2822 TmpMaxDelFrac = MaxDelHardLimit;
2823 }
2824 if (TmpMaxDel<MaxDelSoftLimit) {
2825 TmpMaxDel = MaxDelSoftLimit;
2826 }
2827 if (TmpMaxDel>MaxDelHardLimit) {
2828 TmpMaxDel = MaxDelHardLimit;
2829 }
2830
2831 repl = cfg_getuint32("CHUNKS_WRITE_REP_LIMIT", 2);
2832 if (repl > 0) {
2833 MaxWriteRepl = repl;
2834 }
2835
2836 repl = cfg_getuint32("CHUNKS_READ_REP_LIMIT", 10);
2837 if (repl > 0) {
2838 MaxReadRepl = repl;
2839 }
2840
2841 ChunksLoopPeriod = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_PERIOD", 1000, MINCHUNKSLOOPPERIOD, MAXCHUNKSLOOPPERIOD);
2842 if (gChunkLoopEventHandle) {
2843 eventloop_timechange_ms(gChunkLoopEventHandle, ChunksLoopPeriod);
2844 }
2845
2846 repl = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MAX_CPU", 60, MINCHUNKSLOOPCPU, MAXCHUNKSLOOPCPU);
2847 ChunksLoopTimeout = repl * ChunksLoopPeriod / 100;
2848
2849 if (cfg_isdefined("CHUNKS_LOOP_TIME")) {
2850 looptime = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_TIME", 300, MINLOOPTIME, MAXLOOPTIME);
2851 uint64_t scaled_looptime = std::max((uint64_t)1000 * looptime / ChunksLoopPeriod, (uint64_t)1);
2852 HashSteps = 1 + ((HASHSIZE) / scaled_looptime);
2853 HashCPS = 0xFFFFFFFF;
2854 } else {
2855 looptime = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MIN_TIME", 300, MINLOOPTIME, MAXLOOPTIME);
2856 HashCPS = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MAX_CPS", 100000, MINCPS, MAXCPS);
2857 uint64_t scaled_looptime = std::max((uint64_t)1000 * looptime / ChunksLoopPeriod, (uint64_t)1);
2858 HashSteps = 1 + ((HASHSIZE) / scaled_looptime);
2859 HashCPS = (uint64_t)ChunksLoopPeriod * HashCPS / 1000;
2860 }
2861 double endangeredChunksPriority = cfg_ranged_get("ENDANGERED_CHUNKS_PRIORITY", 0.0, 0.0, 1.0);
2862 gEndangeredChunksServingLimit = HashSteps * endangeredChunksPriority;
2863 gEndangeredChunksMaxCapacity = cfg_get("ENDANGERED_CHUNKS_MAX_CAPACITY", static_cast<uint64_t>(1024*1024UL));
2864 gAcceptableDifference = cfg_ranged_get("ACCEPTABLE_DIFFERENCE",0.1, 0.001, 10.0);
2865 RebalancingBetweenLabels = cfg_getuint32("CHUNKS_REBALANCING_BETWEEN_LABELS", 0) == 1;
2866 }
2867 #endif
2868
chunk_strinit(void)2869 int chunk_strinit(void) {
2870 gChunksMetadata = new ChunksMetadata;
2871
2872 #ifndef METARESTORE
2873 Chunk::count = 0;
2874 for (int i = 0; i < CHUNK_MATRIX_SIZE; ++i) {
2875 for (int j = 0; j < CHUNK_MATRIX_SIZE; ++j) {
2876 Chunk::allFullChunkCopies[i][j] = 0;
2877 }
2878 }
2879 Chunk::allChunksAvailability = ChunksAvailabilityState();
2880 Chunk::allChunksReplicationState = ChunksReplicationState();
2881
2882 uint32_t disableChunksDel = cfg_getuint32("DISABLE_CHUNKS_DEL", 0);
2883 gOperationsDelayInit = cfg_getuint32("REPLICATIONS_DELAY_INIT", 300);
2884 gOperationsDelayDisconnect = cfg_getuint32("REPLICATIONS_DELAY_DISCONNECT", 3600);
2885 gOperationsDelayInit = cfg_getuint32("OPERATIONS_DELAY_INIT", gOperationsDelayInit);
2886 gOperationsDelayDisconnect = cfg_getuint32("OPERATIONS_DELAY_DISCONNECT", gOperationsDelayDisconnect);
2887 gAvoidSameIpChunkservers = cfg_getuint32("AVOID_SAME_IP_CHUNKSERVERS", 0);
2888 gRedundancyLevel = cfg_getuint32("REDUNDANCY_LEVEL", 0);
2889
2890 if (disableChunksDel) {
2891 MaxDelHardLimit = MaxDelSoftLimit = 0;
2892 } else {
2893 MaxDelSoftLimit = cfg_getuint32("CHUNKS_SOFT_DEL_LIMIT",10);
2894 if (cfg_isdefined("CHUNKS_HARD_DEL_LIMIT")) {
2895 MaxDelHardLimit = cfg_getuint32("CHUNKS_HARD_DEL_LIMIT",25);
2896 if (MaxDelHardLimit<MaxDelSoftLimit) {
2897 MaxDelSoftLimit = MaxDelHardLimit;
2898 lzfs_pretty_syslog(LOG_WARNING, "%s: CHUNKS_SOFT_DEL_LIMIT is greater than "
2899 "CHUNKS_HARD_DEL_LIMIT - using CHUNKS_HARD_DEL_LIMIT for both",
2900 cfg_filename().c_str());
2901 }
2902 } else {
2903 MaxDelHardLimit = 3 * MaxDelSoftLimit;
2904 }
2905 if (MaxDelSoftLimit == 0) {
2906 throw InitializeException(cfg_filename() + ": CHUNKS_SOFT_DEL_LIMIT is zero");
2907 }
2908 }
2909 TmpMaxDelFrac = MaxDelSoftLimit;
2910 TmpMaxDel = MaxDelSoftLimit;
2911 MaxWriteRepl = cfg_getuint32("CHUNKS_WRITE_REP_LIMIT",2);
2912 MaxReadRepl = cfg_getuint32("CHUNKS_READ_REP_LIMIT",10);
2913 if (MaxReadRepl==0) {
2914 throw InitializeException(cfg_filename() + ": CHUNKS_READ_REP_LIMIT is zero");
2915 }
2916 if (MaxWriteRepl==0) {
2917 throw InitializeException(cfg_filename() + ": CHUNKS_WRITE_REP_LIMIT is zero");
2918 }
2919
2920 ChunksLoopPeriod = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_PERIOD", 1000, MINCHUNKSLOOPPERIOD, MAXCHUNKSLOOPPERIOD);
2921 uint32_t repl = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MAX_CPU", 60, MINCHUNKSLOOPCPU, MAXCHUNKSLOOPCPU);
2922 ChunksLoopTimeout = repl * ChunksLoopPeriod / 100;
2923
2924 uint32_t looptime;
2925 if (cfg_isdefined("CHUNKS_LOOP_TIME")) {
2926 lzfs_pretty_syslog(LOG_WARNING,
2927 "%s: defining loop time by CHUNKS_LOOP_TIME option is "
2928 "deprecated - use CHUNKS_LOOP_MAX_CPS and CHUNKS_LOOP_MIN_TIME",
2929 cfg_filename().c_str());
2930 looptime = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_TIME", 300, MINLOOPTIME, MAXLOOPTIME);
2931 uint64_t scaled_looptime = std::max((uint64_t)1000 * looptime / ChunksLoopPeriod, (uint64_t)1);
2932 HashSteps = 1 + ((HASHSIZE) / scaled_looptime);
2933 HashCPS = 0xFFFFFFFF;
2934 } else {
2935 looptime = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MIN_TIME", 300, MINLOOPTIME, MAXLOOPTIME);
2936 HashCPS = cfg_get_minmaxvalue<uint32_t>("CHUNKS_LOOP_MAX_CPS", 100000, MINCPS, MAXCPS);
2937 uint64_t scaled_looptime = std::max((uint64_t)1000 * looptime / ChunksLoopPeriod, (uint64_t)1);
2938 HashSteps = 1 + ((HASHSIZE) / scaled_looptime);
2939 HashCPS = (uint64_t)ChunksLoopPeriod * HashCPS / 1000;
2940 }
2941 double endangeredChunksPriority = cfg_ranged_get("ENDANGERED_CHUNKS_PRIORITY", 0.0, 0.0, 1.0);
2942 gEndangeredChunksServingLimit = HashSteps * endangeredChunksPriority;
2943 gEndangeredChunksMaxCapacity = cfg_get("ENDANGERED_CHUNKS_MAX_CAPACITY", static_cast<uint64_t>(1024*1024UL));
2944 gAcceptableDifference = cfg_ranged_get("ACCEPTABLE_DIFFERENCE", 0.1, 0.001, 10.0);
2945 RebalancingBetweenLabels = cfg_getuint32("CHUNKS_REBALANCING_BETWEEN_LABELS", 0) == 1;
2946 eventloop_reloadregister(chunk_reload);
2947 metadataserver::registerFunctionCalledOnPromotion(chunk_become_master);
2948 eventloop_eachloopregister(chunk_clean_zombie_servers_a_bit);
2949 if (metadataserver::isMaster()) {
2950 chunk_become_master();
2951 }
2952 #endif
2953 return 1;
2954 }
2955