1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #define BUCKETS_MMAP_ALLOC 1
22 #define HASHTAB_PREALLOC 1
23 #define CHUNKHASH_MOVEFACTOR 5
24
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28
29 #include <inttypes.h>
30 #include <stdlib.h>
31 #include <stdarg.h>
32 #include <fcntl.h>
33 #include <unistd.h>
34 #include <syslog.h>
35 #include <stdio.h>
36 #include <string.h>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39
40 #include "MFSCommunication.h"
41
42 #include "main.h"
43 #include "cfg.h"
44 #include "bio.h"
45 #include "metadata.h"
46 #include "matocsserv.h"
47 #include "matoclserv.h"
48 #include "changelog.h"
49 #include "csdb.h"
50 #include "random.h"
51 #include "topology.h"
52 #include "chunks.h"
53 #include "filesystem.h"
54 #include "datapack.h"
55 #include "massert.h"
56 #include "hashfn.h"
57 #include "buckets.h"
58 #include "clocks.h"
59 #include "storageclass.h"
60
61 #define MINLOOPTIME 60
62 #define MAXLOOPTIME 7200
63 #define MAXCPS 10000000
64 #define MINCPS 10000
65 #define TICKSPERSECOND 50
66
67 #define NEWCHUNKDELAY 150
68 #define LOSTCHUNKDELAY 50
69
70 #define HASHTAB_LOBITS 24
71 #define HASHTAB_HISIZE (0x80000000>>(HASHTAB_LOBITS))
72 #define HASHTAB_LOSIZE (1<<HASHTAB_LOBITS)
73 #define HASHTAB_MASK (HASHTAB_LOSIZE-1)
74 #define HASHTAB_MOVEFACTOR 5
75
76 // #define DISCLOOPRATIO 0x400
77
78 //#define HASHSIZE 0x100000
79 //#define HASHPOS(chunkid) (((uint32_t)chunkid)&0xFFFFF)
80
81 //#define DISCLOOPELEMENTS (HASHSIZE/0x400)
82
83
84 enum {JOBS_INIT,JOBS_EVERYLOOP,JOBS_EVERYTICK,JOBS_TERM};
85
86 /* chunk.operation */
87 enum {NONE=0,CREATE,SET_VERSION,DUPLICATE,TRUNCATE,DUPTRUNC,REPLICATE};
88
89 static const char* opstr[] = {
90 "NONE",
91 "CREATE",
92 "SET_VERSION",
93 "DUPLICATE",
94 "TRUNCATE",
95 "DUPLICATE+TRUNCATE",
96 "REPLICATE",
97 "???"
98 };
99
100 /* slist.valid */
101 /* INVALID - got info from chunkserver (IO error etc.) -> to delete */
102 /* DEL - deletion in progress */
103 /* BUSY - operation in progress */
104 /* VALID - ok */
105 /* WVER - wrong version - repair or delete */
106 /* TDBUSY - to be deleted + operation in progress */
107 /* TDVALID - ok, to be deleted */
108 /* TDWVER - wrong version, to be deleted */
109 enum {INVALID,DEL,BUSY,VALID,WVER,TDBUSY,TDVALID,TDWVER};
110
111 #ifdef MFSDEBUG
112 static const char* validstr[] = {
113 "INVALID",
114 "DELETE",
115 "BUSY",
116 "VALID",
117 "WVER",
118 "TDBUSY",
119 "TDVALID",
120 "TDWVER"
121 };
122 #endif
123
124 typedef struct _discserv {
125 uint16_t csid;
126 struct _discserv *next;
127 } discserv;
128
129 static discserv *discservers = NULL;
130 static discserv *discservers_next = NULL;
131
132 /*
133 typedef struct _bcdata {
134 void *ptr;
135 uint32_t version;
136 } bcdata;
137 */
138 /*
139 typedef struct _hintlist {
140 uint32_t ip;
141 uint16_t port;
142 struct _hintlist *next;
143 } hintlist;
144 */
145 typedef struct _slist {
146 uint16_t csid;
147 uint8_t valid;
148 uint32_t version;
149 struct _slist *next;
150 } slist;
151
152 /*
153 #define SLIST_BUCKET_SIZE 5000
154
155 typedef struct _slist_bucket {
156 slist bucket[SLIST_BUCKET_SIZE];
157 uint32_t firstfree;
158 struct _slist_bucket *next;
159 } slist_bucket;
160
161 static slist_bucket *sbhead = NULL;
162 static slist *slfreehead = NULL;
163 */
164
165 typedef struct chunk {
166 uint64_t chunkid;
167 uint32_t version;
168 uint8_t sclassid;
169 uint8_t allvalidcopies;
170 uint8_t regularvalidcopies;
171 unsigned ondangerlist:1;
172 unsigned needverincrease:1;
173 unsigned interrupted:1;
174 unsigned writeinprogress:1;
175 unsigned archflag:1;
176 unsigned operation:3;
177 uint32_t lockedto;
178 uint32_t fcount;
179 slist *slisthead;
180 uint32_t *ftab;
181 struct chunk *next;
182 } chunk;
183
184 /*
185 #define CHUNK_BUCKET_SIZE 20000
186 typedef struct _chunk_bucket {
187 chunk bucket[CHUNK_BUCKET_SIZE];
188 uint32_t firstfree;
189 struct _chunk_bucket *next;
190 } chunk_bucket;
191
192 static chunk_bucket *cbhead = NULL;
193 static chunk *chfreehead = NULL;
194 */
195 static chunk **chunkhashtab[HASHTAB_HISIZE];
196 static uint32_t chunkrehashpos;
197 static uint32_t chunkhashsize;
198 static uint32_t chunkhashelem;
199
200 static uint64_t nextchunkid=1;
201 #define LOCKTIMEOUT 120
202
203 #define UNUSED_DELETE_TIMEOUT (86400*7)
204
205 typedef struct _csopchunk {
206 uint64_t chunkid;
207 uint8_t status;
208 struct _csopchunk *next;
209 } csopchunk;
210
211 /* csdata.mfr_state */
212 /* UNKNOWN_HARD - unknown after disconnect or creation */
213 /* UNKNOWN_SOFT - unknown, loop in progress */
214 /* CAN_BE_REMOVED - can be removed, whole loop has passed */
215 /* REPL_IN_PROGRESS - chunks still needs to be replicated, can't be removed */
216 /* WAS_IN_PROGRESS - was in REPL_IN_PROGRESS during previous loop */
217 enum {UNKNOWN_HARD,UNKNOWN_SOFT,CAN_BE_REMOVED,REPL_IN_PROGRESS,WAS_IN_PROGRESS};
218
219 // state automaton:
220 //
221 // 1 - Chunk with number of valid copies less than goal (servers with chunks marked for removal only)
222 // 2 - Chunkserver disconnection (all servers)
223 // 3 - Loop end (all servers)
224 //
225 // 1 2 3
226 // UNKNOWN_HARD | REPL_IN_PROGRESS | UNKNOWN_HARD | UNKNOWN_SOFT
227 // UNKNOWN_SOFT | REPL_IN_PROGRESS | UNKNOWN_HARD | CAN_BE_REMOVED
228 // CAN_BE_REMOVED | REPL_IN_PROGRESS | UNKNOWN_HARD | CAN_BE_REMOVED
229 // REPL_IN_PROGRESS | REPL_IN_PROGRESS | UNKNOWN_HARD | WAS_IN_PROGRESS
230 // WAS_IN_PROGRESS | REPL_IN_PROGRESS | UNKNOWN_HARD | CAN_BE_REMOVED
231 //
232 // UNKNOWN_HARD,UNKNOWN_SOFT - Unknown state
233 // CAN_BE_REMOVED - Can be removed
234 // REPL_IN_PROGRESS,WAS_IN_PROGRESS - In progress
235
236 typedef struct _csdata {
237 void *ptr;
238 csopchunk *opchunks;
239 uint8_t valid;
240 unsigned registered:1;
241 unsigned mfr_state:3;
242 uint8_t newchunkdelay;
243 uint8_t lostchunkdelay;
244 uint32_t next;
245 uint32_t prev;
246 } csdata;
247
248 static csdata *cstab = NULL;
249 static uint32_t csfreehead = MAXCSCOUNT;
250 static uint32_t csfreetail = MAXCSCOUNT;
251 static uint32_t csusedhead = MAXCSCOUNT;
252 static uint32_t opsinprogress = 0;
253 static uint16_t csregisterinprogress = 0;
254 static uint8_t csreceivingchunks = 0;
255
256 #define DANGER_PRIORITIES 7
257 #define REPLICATION_DANGER_PRIORITIES 6
258
259 #define DPRIORITY_ENDANGERED_HIGHGOAL 0
260 #define DPRIORITY_ENDANGERED 1
261 #define DPRIORITY_UNDERGOAL_MFR 2
262 #define DPRIORITY_MFR 3
263 #define DPRIORITY_UNDERGOAL 4
264 #define DPRIORITY_WRONGLABEL 5
265 #define DPRIORITY_OVERGOAL 6
266
267 static chunk** chunks_priority_queue[DANGER_PRIORITIES];
268 static uint32_t chunks_priority_leng[DANGER_PRIORITIES];
269 static uint32_t chunks_priority_head[DANGER_PRIORITIES];
270 static uint32_t chunks_priority_tail[DANGER_PRIORITIES];
271
272 static double chunks_priority_mincpsperc[DANGER_PRIORITIES] = {1.0,0.1,0.1,0.01,0.05,0.01,0.3};
273
274 static uint32_t ReplicationsDelayInit=60;
275 static uint32_t DangerMaxLeng=1000000;
276
277 static double MaxWriteRepl[4];
278 static double MaxReadRepl[4];
279 static uint32_t MaxDelSoftLimit;
280 static uint32_t MaxDelHardLimit;
281 static double TmpMaxDelFrac;
282 static uint32_t TmpMaxDel;
283 static uint8_t ReplicationsRespectTopology;
284 static uint32_t CreationsRespectTopology;
285 static uint32_t LoopTimeMin;
286 //static uint32_t HashSteps;
287 static uint32_t HashCPTMax;
288 static double AcceptableDifference;
289
290 static uint8_t DoNotUseSameIP;
291 static uint8_t DoNotUseSameRack;
292 static uint32_t LabelUniqueMask;
293
294 static uint32_t jobshpos;
295 static uint32_t jobshstep;
296 static uint32_t jobshcnt;
297 static uint32_t jobshmax;
298
299 static uint32_t starttime;
300
301 typedef struct _job_info {
302 uint32_t del_invalid;
303 uint32_t del_unused;
304 uint32_t del_diskclean;
305 uint32_t del_overgoal;
306 uint32_t copy_undergoal;
307 uint32_t copy_wronglabels;
308 } job_info;
309
310 typedef struct _loop_info {
311 job_info done,notdone;
312 uint32_t locked_unused;
313 uint32_t locked_used;
314 uint32_t copy_rebalance;
315 uint32_t labels_dont_match;
316 } loop_info;
317
318 static loop_info chunksinfo = {{0,0,0,0,0,0},{0,0,0,0,0,0},0,0,0,0};
319 static uint32_t chunksinfo_loopstart=0,chunksinfo_loopend=0;
320
321 static uint64_t lastchunkid=0;
322 static chunk* lastchunkptr=NULL;
323
324 static uint32_t chunks;
325
326 static uint32_t last_rebalance=0;
327
328 static uint32_t **allchunkcounts;
329 static uint32_t **regularchunkcounts;
330
331 static uint32_t stats_chunkops[12];
332
333
334 #ifdef MFSDEBUG
mfsdebug(const char * format,...)335 static void mfsdebug(const char *format,...) {
336 va_list ap;
337 static FILE *fd = NULL;
338
339 if (format==NULL) {
340 if (fd==NULL) {
341 fd = fopen("mfsdebug.txt","a");
342 } else {
343 fflush(fd);
344 }
345 } else if (fd!=NULL) {
346 fprintf(fd,"%"PRIu32" : ",(uint32_t)(main_time()));
347 va_start(ap,format);
348 vfprintf(fd,format,ap);
349 va_end(ap);
350 fputc('\n',fd);
351 }
352 }
353
mfsdebug_flush(void)354 void mfsdebug_flush(void) {
355 mfsdebug(NULL);
356 }
357 #endif
358
359 // input:
360 // - number of desired copies -> labelcnt
361 // - definition of desired labels -> labelmasks
362 // - number of available servers -> servcnt
363 // - pointers to servers -> servers
364 // output:
365 // [0 .. labelscnt-1] -> matching server index (+labelcnt)
366 // [labelcnt .. labelcnt+servcnt-1] -> matching label definition
367 // example:
368 // 3,6,[A,A,A],[B,A,A,C,A,A] -> [4,5,7,-1,0,1,-1,2,-1,-1]
369
do_perfect_match(uint32_t labelcnt,uint32_t servcnt,uint32_t ** labelmasks,uint16_t * servers)370 int32_t* do_perfect_match(uint32_t labelcnt,uint32_t servcnt,uint32_t **labelmasks,uint16_t *servers) {
371 uint32_t s,i,l,x,v,vi,sp,rsp,sid,sids,gr;
372 int32_t t;
373 static int32_t *imatching = NULL;
374 static int32_t *matching = NULL;
375 static int32_t *augment = NULL;
376 static uint8_t *visited = NULL;
377 static int32_t *stack = NULL;
378 static int32_t *group = NULL;
379 static int32_t *grnode = NULL;
380 static uint32_t *sidval = NULL;
381 static int32_t *sidpos = NULL;
382 static uint32_t tablength = 0;
383 static uint32_t stablength = 0;
384
385 if (labelcnt + servcnt > tablength || imatching==NULL || matching==NULL || augment==NULL || visited==NULL || stack==NULL) {
386 tablength = 100 + 2 * (labelcnt + servcnt);
387 if (imatching) {
388 free(imatching);
389 }
390 if (matching) {
391 free(matching);
392 }
393 if (augment) {
394 free(augment);
395 }
396 if (visited) {
397 free(visited);
398 }
399 if (stack) {
400 free(stack);
401 }
402 imatching = malloc(sizeof(int32_t)*tablength);
403 passert(imatching);
404 matching = malloc(sizeof(int32_t)*tablength);
405 passert(matching);
406 augment = malloc(sizeof(int32_t)*tablength);
407 passert(augment);
408 visited = malloc(sizeof(uint8_t)*tablength);
409 passert(visited);
410 stack = malloc(sizeof(int32_t)*tablength);
411 passert(stack);
412 }
413 if (servcnt > stablength || sidval==NULL || sidpos==NULL || grnode==NULL || group==NULL) {
414 stablength = 100 + 2 * servcnt;
415 if (sidval) {
416 free(sidval);
417 }
418 if (sidpos) {
419 free(sidpos);
420 }
421 if (grnode) {
422 free(grnode);
423 }
424 if (group) {
425 free(group);
426 }
427 sidval = malloc(sizeof(uint32_t)*stablength);
428 passert(sidval);
429 sidpos = malloc(sizeof(int32_t)*stablength);
430 passert(sidpos);
431 grnode = malloc(sizeof(int32_t)*stablength);
432 passert(grnode);
433 group = malloc(sizeof(int32_t)*stablength);
434 passert(group);
435 }
436
437 s = servcnt + labelcnt;
438 if (s==0) {
439 return matching;
440 }
441
442 for (i=0 ; i<s ; i++) {
443 imatching[i] = -1;
444 matching[i] = -1;
445 augment[i] = -1;
446 }
447
448 if (servcnt==0 || labelcnt==0) {
449 return matching;
450 }
451
452 sp = 0;
453
454 // calc groups
455 sids = 0;
456 for (i=0 ; i<servcnt ; i++) {
457 if (DoNotUseSameIP) {
458 sid = matocsserv_server_get_ip(cstab[servers[i]].ptr);
459 } else if (DoNotUseSameRack) {
460 sid = topology_get_rackid(matocsserv_server_get_ip(cstab[servers[i]].ptr));
461 } else {
462 sid = matocsserv_server_get_labelmask(cstab[servers[i]].ptr) & LabelUniqueMask;
463 }
464 if (sid==0) {
465 group[i] = i;
466 } else {
467 for (l=0 ; l<sids && sid!=sidval[l] ; l++) { }
468 if (l==sids) {
469 sidval[l] = sid;
470 sidpos[l] = i;
471 sids++;
472 }
473 group[i] = sidpos[l];
474 }
475 }
476
477 for (l=0 ; l<labelcnt ; l++) {
478 if (imatching[l]==-1) {
479 for (i=0 ; i<servcnt+labelcnt ; i++) {
480 visited[i] = 0;
481 }
482 visited[l] = 1;
483 augment[l] = -1;
484 stack[sp++] = l; // push
485 while (sp!=0) { // stack not empty
486 x = stack[--sp]; // pop
487 if (x<labelcnt) {
488 rsp = sp;
489 for (v=0 ; v<servcnt ; v++) {
490 #ifdef MATCH_LEFT
491 vi = v;
492 #else
493 vi = servcnt-1-v;
494 #endif
495 gr = group[vi];
496 if (visited[labelcnt+gr]==0) {
497 if (matocsserv_server_has_labels(cstab[servers[vi]].ptr,labelmasks[x])) {
498 visited[labelcnt+gr] = 1;
499 augment[labelcnt+gr] = x;
500 grnode[gr] = vi;
501 stack[sp++] = labelcnt + gr; // push
502 }
503 }
504 }
505 // reverse stack from rsp to sp
506 for (v=0 ; v<(sp-rsp)/2 ; v++) {
507 // swap stack[rsp+v] , stack[sp-1-v]
508 t = stack[sp-1-v];
509 stack[sp-1-v] = stack[rsp+v];
510 stack[rsp+v] = t;
511 }
512 } else if (imatching[x] >= 0) {
513 augment[imatching[x]] = x;
514 visited[imatching[x]] = 1;
515 stack[sp++] = imatching[x];
516 } else {
517 while (augment[x]>=0) {
518 if (x>=labelcnt) {
519 imatching[x] = augment[x];
520 imatching[augment[x]] = x;
521 matching[augment[x]] = grnode[x-labelcnt] + labelcnt;
522 }
523 x = augment[x];
524 }
525 sp = 0; // clear stack;
526 }
527 }
528 }
529 }
530
531 // add reverse matching
532 for (i=0 ; i<labelcnt ; i++) {
533 t = matching[i];
534 if (t>=0) {
535 matching[t] = i;
536 }
537 }
538 return matching;
539 }
540
chunk_stats(uint32_t chunkops[12])541 void chunk_stats(uint32_t chunkops[12]) {
542 uint32_t i;
543 for (i=0 ; i<12 ; i++) {
544 chunkops[i] = stats_chunkops[i];
545 stats_chunkops[i] = 0;
546 }
547 }
548
549 CREATE_BUCKET_ALLOCATOR(slist,slist,10000000/sizeof(slist))
550
551 CREATE_BUCKET_ALLOCATOR(chunk,chunk,10000000/sizeof(chunk))
552
chunk_get_memusage(uint64_t allocated[3],uint64_t used[3])553 void chunk_get_memusage(uint64_t allocated[3],uint64_t used[3]) {
554 allocated[0] = sizeof(chunk*)*chunkrehashpos;
555 used[0] = sizeof(chunk*)*chunkhashelem;
556 chunk_getusage(allocated+1,used+1);
557 slist_getusage(allocated+2,used+2);
558 }
559
560 /*
561 static inline uint32_t chunk_calc_hash_size(uint32_t elements) {
562 uint32_t res=1;
563 while (elements) {
564 elements>>=1;
565 res<<=1;
566 }
567 if (res==0) {
568 res = UINT32_C(0x80000000);
569 }
570 if (res<HASHTAB_LOSIZE) {
571 return HASHTAB_LOSIZE;
572 }
573 return res;
574 }
575 */
576
chunk_hash_init(void)577 static inline void chunk_hash_init(void) {
578 uint16_t i;
579 chunkhashsize = 0;
580 chunkhashelem = 0;
581 chunkrehashpos = 0;
582 for (i=0 ; i<HASHTAB_HISIZE ; i++) {
583 chunkhashtab[i] = NULL;
584 }
585 }
586
chunk_hash_cleanup(void)587 static inline void chunk_hash_cleanup(void) {
588 uint16_t i;
589 chunkhashelem = 0;
590 chunkhashsize = 0;
591 chunkrehashpos = 0;
592 for (i=0 ; i<HASHTAB_HISIZE ; i++) {
593 if (chunkhashtab[i]!=NULL) {
594 #ifdef HAVE_MMAP
595 munmap(chunkhashtab[i],sizeof(chunk*)*HASHTAB_LOSIZE);
596 #else
597 free(chunkhashtab[i]);
598 #endif
599 }
600 chunkhashtab[i] = NULL;
601 }
602 }
603
chunk_hash_rehash(void)604 static inline void chunk_hash_rehash(void) {
605 uint16_t i;
606 chunkrehashpos = chunkhashsize;
607 chunkhashsize *= 2;
608 for (i=(chunkhashsize>>HASHTAB_LOBITS)/2 ; i<chunkhashsize>>HASHTAB_LOBITS ; i++) {
609 #ifdef HAVE_MMAP
610 chunkhashtab[i] = mmap(NULL,sizeof(chunk*)*HASHTAB_LOSIZE,PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE,-1,0);
611 #else
612 chunkhashtab[i] = malloc(sizeof(chunk*)*HASHTAB_LOSIZE);
613 #endif
614 passert(chunkhashtab[i]);
615 }
616 }
617
chunk_hash_move(void)618 static inline void chunk_hash_move(void) {
619 uint32_t hash;
620 uint32_t mask;
621 uint32_t moved=0;
622 chunk **chptr,**chptralt,*c;
623 mask = chunkhashsize-1;
624 do {
625 if (chunkrehashpos>=chunkhashsize) { // rehash complete
626 chunkrehashpos = chunkhashsize;
627 return;
628 }
629 chptr = chunkhashtab[(chunkrehashpos - (chunkhashsize/2)) >> HASHTAB_LOBITS] + (chunkrehashpos & HASHTAB_MASK);
630 chptralt = chunkhashtab[chunkrehashpos >> HASHTAB_LOBITS] + (chunkrehashpos & HASHTAB_MASK);
631 *chptralt = NULL;
632 while ((c=*chptr)!=NULL) {
633 hash = hash32(c->chunkid) & mask;
634 if (hash==chunkrehashpos) {
635 *chptralt = c;
636 *chptr = c->next;
637 chptralt = &(c->next);
638 c->next = NULL;
639 } else {
640 chptr = &(c->next);
641 }
642 moved++;
643 }
644 chunkrehashpos++;
645 } while (moved<CHUNKHASH_MOVEFACTOR);
646 }
647
chunk_hash_find(uint64_t chunkid)648 static inline chunk* chunk_hash_find(uint64_t chunkid) {
649 chunk *c;
650 uint32_t hash;
651
652 if (chunkhashsize==0) {
653 return NULL;
654 }
655 hash = hash32(chunkid) & (chunkhashsize-1);
656 if (chunkrehashpos<chunkhashsize) {
657 chunk_hash_move();
658 if (hash >= chunkrehashpos) {
659 hash -= chunkhashsize/2;
660 }
661 }
662 for (c=chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK] ; c ; c=c->next) {
663 if (c->chunkid==chunkid) {
664 return c;
665 }
666 }
667 return NULL;
668 }
669
chunk_hash_delete(chunk * c)670 static inline void chunk_hash_delete(chunk *c) {
671 chunk **chptr,*cit;
672 uint32_t hash;
673
674 if (chunkhashsize==0) {
675 return;
676 }
677 hash = hash32(c->chunkid) & (chunkhashsize-1);
678 if (chunkrehashpos<chunkhashsize) {
679 chunk_hash_move();
680 if (hash >= chunkrehashpos) {
681 hash -= chunkhashsize/2;
682 }
683 }
684 chptr = chunkhashtab[hash>>HASHTAB_LOBITS] + (hash&HASHTAB_MASK);
685 while ((cit=*chptr)!=NULL) {
686 if (cit==c) {
687 *chptr = c->next;
688 chunkhashelem--;
689 return;
690 }
691 chptr = &(cit->next);
692 }
693 }
694
chunk_hash_add(chunk * c)695 static inline void chunk_hash_add(chunk *c) {
696 uint16_t i;
697 uint32_t hash;
698
699 if (chunkhashsize==0) {
700 chunkhashsize = HASHTAB_LOSIZE; //chunk_calc_hash_size(maxnodeid);
701 chunkrehashpos = chunkhashsize;
702 chunkhashelem = 0;
703 for (i=0 ; i<chunkhashsize>>HASHTAB_LOBITS ; i++) {
704 #ifdef HAVE_MMAP
705 chunkhashtab[i] = mmap(NULL,sizeof(chunk*)*HASHTAB_LOSIZE,PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE,-1,0);
706 #else
707 chunkhashtab[i] = malloc(sizeof(chunk*)*HASHTAB_LOSIZE);
708 #endif
709 passert(chunkhashtab[i]);
710 memset(chunkhashtab[i],0,sizeof(chunk*));
711 if (chunkhashtab[i][0]==NULL) {
712 memset(chunkhashtab[i],0,sizeof(chunk*)*HASHTAB_LOSIZE);
713 } else {
714 for (hash=0 ; hash<HASHTAB_LOSIZE ; hash++) {
715 chunkhashtab[i][hash] = NULL;
716 }
717 }
718 }
719 }
720 hash = hash32(c->chunkid) & (chunkhashsize-1);
721 if (chunkrehashpos<chunkhashsize) {
722 chunk_hash_move();
723 if (hash >= chunkrehashpos) {
724 hash -= chunkhashsize/2;
725 }
726 c->next = chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK];
727 chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK] = c;
728 chunkhashelem++;
729 } else {
730 c->next = chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK];
731 chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK] = c;
732 chunkhashelem++;
733 if (chunkhashelem>chunkhashsize && (chunkhashsize>>HASHTAB_LOBITS)<HASHTAB_HISIZE) {
734 chunk_hash_rehash();
735 }
736 }
737 }
738
chunk_new(uint64_t chunkid)739 chunk* chunk_new(uint64_t chunkid) {
740 chunk *newchunk;
741 newchunk = chunk_malloc();
742 //#ifdef METARESTORE
743 // printf("N%"PRIu64"\n",chunkid);
744 //#endif
745 chunks++;
746 allchunkcounts[0][0]++;
747 regularchunkcounts[0][0]++;
748 newchunk->chunkid = chunkid;
749 newchunk->version = 0;
750 newchunk->sclassid = 0;
751 newchunk->lockedto = 0;
752 newchunk->allvalidcopies = 0;
753 newchunk->regularvalidcopies = 0;
754 newchunk->needverincrease = 1;
755 newchunk->ondangerlist = 0;
756 newchunk->interrupted = 0;
757 newchunk->writeinprogress = 0;
758 newchunk->archflag = 0;
759 newchunk->operation = NONE;
760 newchunk->slisthead = NULL;
761 newchunk->fcount = 0;
762 // newchunk->flisthead = NULL;
763 newchunk->ftab = NULL;
764 lastchunkid = chunkid;
765 lastchunkptr = newchunk;
766 chunk_hash_add(newchunk);
767 return newchunk;
768 }
769
chunk_find(uint64_t chunkid)770 chunk* chunk_find(uint64_t chunkid) {
771 chunk *c;
772 //#ifdef METARESTORE
773 // printf("F%"PRIu64"\n",chunkid);
774 //#endif
775 if (lastchunkid==chunkid) {
776 return lastchunkptr;
777 }
778 c = chunk_hash_find(chunkid);
779 if (c) {
780 lastchunkid = chunkid;
781 lastchunkptr = c;
782 }
783 return c;
784 }
785
chunk_delete(chunk * c)786 void chunk_delete(chunk* c) {
787 uint32_t indx;
788 if (lastchunkptr==c) {
789 lastchunkid=0;
790 lastchunkptr=NULL;
791 }
792 chunks--;
793 indx = c->sclassid + (c->archflag?MAXSCLASS:0);
794 allchunkcounts[indx][0]--;
795 regularchunkcounts[indx][0]--;
796 chunk_hash_delete(c);
797 chunk_free(c);
798 }
799
chunk_state_change(uint8_t oldsclassid,uint8_t newsclassid,uint8_t oldarchflag,uint8_t newarchflag,uint8_t oldavc,uint8_t newavc,uint8_t oldrvc,uint8_t newrvc)800 static inline void chunk_state_change(uint8_t oldsclassid,uint8_t newsclassid,uint8_t oldarchflag,uint8_t newarchflag,uint8_t oldavc,uint8_t newavc,uint8_t oldrvc,uint8_t newrvc) {
801 uint32_t oldindx = oldsclassid + (oldarchflag?MAXSCLASS:0);
802 uint32_t newindx = newsclassid + (newarchflag?MAXSCLASS:0);
803 if (oldavc>9) {
804 oldavc=10;
805 }
806 if (newavc>9) {
807 newavc=10;
808 }
809 if (oldrvc>9) {
810 oldrvc=10;
811 }
812 if (newrvc>9) {
813 newrvc=10;
814 }
815 allchunkcounts[oldindx][oldavc]--;
816 allchunkcounts[newindx][newavc]++;
817 regularchunkcounts[oldindx][oldrvc]--;
818 regularchunkcounts[newindx][newrvc]++;
819 }
820
chunk_count(void)821 uint32_t chunk_count(void) {
822 return chunks;
823 }
824
chunk_sclass_counters(uint8_t sclassid,uint8_t archflag,uint8_t goal,uint64_t * undergoal,uint64_t * exactgoal,uint64_t * overgoal)825 void chunk_sclass_counters(uint8_t sclassid,uint8_t archflag,uint8_t goal,uint64_t *undergoal,uint64_t *exactgoal,uint64_t *overgoal) {
826 uint32_t indx = sclassid + (archflag?MAXSCLASS:0);
827 uint32_t i;
828
829 *undergoal = 0;
830 *exactgoal = 0;
831 *overgoal = 0;
832 for (i=0 ; i<11 ; i++) {
833 if (i<goal) {
834 *undergoal += regularchunkcounts[indx][i];
835 } else if (i>goal) {
836 *overgoal += regularchunkcounts[indx][i];
837 } else {
838 *exactgoal = regularchunkcounts[indx][i];
839 }
840 }
841 }
842
chunk_info(uint32_t * allchunks,uint32_t * allcopies,uint32_t * regularvalidcopies)843 void chunk_info(uint32_t *allchunks,uint32_t *allcopies,uint32_t *regularvalidcopies) {
844 uint32_t i,j,ag,rg;
845 *allchunks = chunks;
846 *allcopies = 0;
847 *regularvalidcopies = 0;
848 for (i=1 ; i<=10 ; i++) {
849 ag=0;
850 rg=0;
851 for (j=0 ; j<MAXSCLASS*2 ; j++) {
852 ag += allchunkcounts[j][i];
853 rg += regularchunkcounts[j][i];
854 }
855 *allcopies += ag*i;
856 *regularvalidcopies += rg*i;
857 }
858 }
859
chunk_get_missing_count(void)860 uint32_t chunk_get_missing_count(void) {
861 uint32_t res=0;
862 uint32_t i;
863
864 for (i=1 ; i<MAXSCLASS*2 ; i++) {
865 res+=allchunkcounts[i][0];
866 }
867 return res;
868 }
869
chunk_counters_in_progress(void)870 uint8_t chunk_counters_in_progress(void) {
871 // syslog(LOG_NOTICE,"discservers: %p , discservers_next: %p , csregisterinprogress: %"PRIu16,discservers,discservers_next,csregisterinprogress);
872 return ((discservers!=NULL || discservers_next!=NULL)?1:0)|((csregisterinprogress>0)?2:0)|csreceivingchunks;
873 }
874
chunk_store_chunkcounters(uint8_t * buff,uint8_t matrixid)875 void chunk_store_chunkcounters(uint8_t *buff,uint8_t matrixid) {
876 uint32_t i,j,indx;
877 uint32_t counts[11][11];
878
879 for (i=0 ; i<=10 ; i++) {
880 for (j=0 ; j<=10 ; j++) {
881 counts[i][j]=0;
882 }
883 }
884
885 if (matrixid==0) {
886 for (i=0 ; i<MAXSCLASS*2 ; i++) {
887 indx = sclass_get_keeparch_goal(i%MAXSCLASS,i/MAXSCLASS);
888 if (indx>10) {
889 indx=10;
890 }
891 for (j=0 ; j<=10 ; j++) {
892 counts[indx][j] += allchunkcounts[i][j];
893 }
894 }
895 } else if (matrixid==1) {
896 for (i=0 ; i<MAXSCLASS*2 ; i++) {
897 indx = sclass_get_keeparch_goal(i%MAXSCLASS,i/MAXSCLASS);
898 if (indx>10) {
899 indx=10;
900 }
901 for (j=0 ; j<=10 ; j++) {
902 counts[indx][j] += regularchunkcounts[i][j];
903 }
904 }
905 }
906
907 for (i=0 ; i<=10 ; i++) {
908 for (j=0 ; j<=10 ; j++) {
909 put32bit(&buff,counts[i][j]);
910 }
911 }
912 }
913
914 /* --- */
915
chunk_priority_enqueue(uint8_t j,chunk * c)916 static inline void chunk_priority_enqueue(uint8_t j,chunk *c) {
917 uint32_t h,l;
918 if (c->ondangerlist) {
919 return;
920 }
921 l = chunks_priority_leng[j];
922 h = chunks_priority_head[j];
923 if (l>=DangerMaxLeng) {
924 if (chunks_priority_queue[j][h]!=NULL) {
925 chunks_priority_queue[j][h]->ondangerlist=0;
926 }
927 }
928 chunks_priority_queue[j][h] = c;
929 c->ondangerlist = 1;
930 h = (h+1)%DangerMaxLeng;
931 chunks_priority_head[j] = h;
932 if (l<DangerMaxLeng) {
933 chunks_priority_leng[j] = l+1;
934 } else {
935 chunks_priority_tail[j] = h;
936 }
937 }
938
chunk_priority_queue_check(chunk * c,uint8_t checklabels)939 static inline void chunk_priority_queue_check(chunk *c,uint8_t checklabels) {
940 slist *s;
941 uint32_t vc,tdc;
942 uint32_t goal;
943 uint8_t j;
944 static uint16_t *servers = NULL;
945 uint32_t servcnt;
946 uint32_t **labelmasks;
947 uint32_t labelcnt;
948 int32_t *matching;
949 uint8_t wronglabels;
950 #ifdef MFSDEBUG
951 uint8_t debug;
952 #endif
953
954 if (c==NULL) {
955 if (servers==NULL && checklabels==0) {
956 servers = malloc(sizeof(uint16_t)*MAXCSCOUNT);
957 passert(servers);
958 }
959 if (servers!=NULL && checklabels==1) {
960 free(servers);
961 }
962 return;
963 }
964
965 #ifdef MFSDEBUG
966 debug = ((c->chunkid&0xFFF)==0)?1:0;
967 #endif
968 if (c->ondangerlist || servers==NULL || c->sclassid==0 || c->fcount==0 || c->lockedto+3600>=(uint32_t)main_time()) {
969 #ifdef MFSDEBUG
970 if (debug) {
971 mfsdebug("chunk_priority_queue_check ; chunkid=%016"PRIX64" ; ignore: ondangerlist=%u ; servers%c=NULL ; sclassid=%u ; fcount=%u ; lockedto=%"PRIu32,c->chunkid,c->ondangerlist,(servers==NULL)?'=':'!',c->sclassid,c->fcount,c->lockedto);
972 }
973 #endif
974 return;
975 }
976 vc = 0;
977 tdc = 0;
978 for (s=c->slisthead ; s ; s=s->next) {
979 switch (s->valid) {
980 case TDVALID:
981 tdc++;
982 break;
983 case VALID:
984 vc++;
985 break;
986 }
987 }
988 wronglabels = 0;
989 goal = sclass_get_keeparch_goal(c->sclassid,c->archflag);
990 if (((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag)) && vc >= goal && checklabels) {
991 servcnt = 0;
992 for (s=c->slisthead ; s ; s=s->next) {
993 if (s->valid==VALID) {
994 servers[servcnt++] = s->csid;
995 }
996 }
997 labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
998 matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
999 for (j=0 ; j<labelcnt ; j++) {
1000 if (matching[j]<0) { // there are unmatched labels
1001 wronglabels = 1;
1002 break;
1003 }
1004 }
1005 }
1006 if (vc+tdc > 0 && (vc != goal || wronglabels)) { // wrong-goal chunk
1007 if (vc+tdc==1 && goal>2) { // highest priority - chunks with one copy and high goal
1008 j = DPRIORITY_ENDANGERED_HIGHGOAL;
1009 } else if (vc+tdc==1 && goal==2) { // next priority - chunks with one copy
1010 j = DPRIORITY_ENDANGERED;
1011 } else if (vc==1 && tdc>0) { // next priority - chunks on one regular disk and some "marked for removal" disks
1012 j = DPRIORITY_UNDERGOAL_MFR;
1013 } else if (tdc>0) { // next priority - chunks on "marked for removal" disks
1014 j = DPRIORITY_MFR;
1015 } else if (vc < goal) { // next priority - standard undergoal chunks
1016 j = DPRIORITY_UNDERGOAL;
1017 } else if (wronglabels) { // next priority - changed labels
1018 j = DPRIORITY_WRONGLABEL;
1019 } else { // lowest priority - overgoal
1020 j = DPRIORITY_OVERGOAL;
1021 }
1022 #ifdef MFSDEBUG
1023 if (debug) {
1024 mfsdebug("chunk_priority_queue_check ; chunkid=%016"PRIX64" ; enqueue: priority=%u ; vc=%u ; tdc=%u ; goal=%u ; wronglabels=%u",c->chunkid,j,vc,tdc,goal,wronglabels);
1025 }
1026 #endif
1027 chunk_priority_enqueue(j,c);
1028 #ifdef MFSDEBUG
1029 } else if (debug) {
1030 mfsdebug("chunk_priority_queue_check ; chunkid=%016"PRIX64" ; exit: vc=%u ; tdc=%u ; goal=%u ; wronglabels=%u",c->chunkid,vc,tdc,goal,wronglabels);
1031 #endif
1032 }
1033 }
1034
1035 /* --- */
1036
chunk_addopchunk(uint16_t csid,uint64_t chunkid)1037 void chunk_addopchunk(uint16_t csid,uint64_t chunkid) {
1038 csopchunk *csop;
1039 csop = malloc(sizeof(csopchunk));
1040 csop->chunkid = chunkid;
1041 csop->status = MFS_ERROR_MISMATCH;
1042 csop->next = cstab[csid].opchunks;
1043 cstab[csid].opchunks = csop;
1044 opsinprogress++;
1045 return;
1046 }
1047
chunk_statusopchunk(uint16_t csid,uint64_t chunkid,uint8_t status)1048 void chunk_statusopchunk(uint16_t csid,uint64_t chunkid,uint8_t status) {
1049 csopchunk *csop;
1050 for (csop=cstab[csid].opchunks ; csop!=NULL; csop=csop->next) {
1051 if (csop->chunkid==chunkid) {
1052 csop->status = status;
1053 return;
1054 }
1055 }
1056 }
1057
chunk_delopchunk(uint16_t csid,uint64_t chunkid)1058 uint8_t chunk_delopchunk(uint16_t csid,uint64_t chunkid) {
1059 csopchunk **csopp,*csop;
1060 uint8_t status;
1061
1062 status = MFS_ERROR_MISMATCH;
1063 csopp = &(cstab[csid].opchunks);
1064 while ((csop = (*csopp))) {
1065 if (csop->chunkid == chunkid) {
1066 status = csop->status;
1067 *csopp = csop->next;
1068 free(csop);
1069 if (opsinprogress>0) {
1070 opsinprogress--;
1071 }
1072 } else {
1073 csopp = &(csop->next);
1074 }
1075 }
1076 return status;
1077 }
1078
chunk_creation_servers(uint16_t csids[MAXCSCOUNT],uint8_t sclassid,uint8_t * olflag,uint32_t clientip)1079 static inline uint16_t chunk_creation_servers(uint16_t csids[MAXCSCOUNT],uint8_t sclassid,uint8_t *olflag,uint32_t clientip) {
1080 uint16_t tmpcsids[MAXCSCOUNT];
1081 int32_t *matching;
1082 uint32_t **labelmasks;
1083 uint8_t labelcnt;
1084 uint8_t sclass_mode;
1085 uint16_t servcount;
1086 uint16_t goodlabelscount;
1087 uint16_t overloaded;
1088 uint32_t i,j;
1089 uint32_t dist;
1090 uint16_t cpos,fpos;
1091 int32_t x;
1092
1093 servcount = matocsserv_getservers_wrandom(csids,&overloaded);
1094 if (servcount==0) {
1095 *olflag = (overloaded>0)?1:0;
1096 return 0;
1097 }
1098 sclass_mode = sclass_get_mode(sclassid);
1099 labelcnt = sclass_get_create_goal(sclassid);
1100 if (servcount < labelcnt && servcount + overloaded >= labelcnt) {
1101 *olflag = 1;
1102 return 0;
1103 } else {
1104 *olflag = 0;
1105 }
1106
1107 if (CreationsRespectTopology>0) {
1108 cpos = 0;
1109 fpos = MAXCSCOUNT;
1110 for (i=0 ; i<servcount ; i++) {
1111 dist = topology_distance(matocsserv_server_get_ip(cstab[csids[i]].ptr),clientip);
1112 if (dist < CreationsRespectTopology) { // close
1113 tmpcsids[cpos++] = csids[i];
1114 } else { // far
1115 tmpcsids[--fpos] = csids[i];
1116 }
1117 }
1118 if (cpos!=0 && fpos!=MAXCSCOUNT) {
1119 for (i=0 ; i<cpos ; i++) {
1120 csids[i] = tmpcsids[i];
1121 }
1122 for (i=fpos,j=servcount-1 ; i<MAXCSCOUNT ; i++,j--) {
1123 csids[j] = tmpcsids[i];
1124 }
1125 }
1126 }
1127
1128 if (sclass_has_create_labels(sclassid)) {
1129 labelcnt = sclass_get_create_labelmasks(sclassid,&labelmasks);
1130
1131 // reverse server list
1132 for (i=0 ; i<servcount/2 ; i++) {
1133 x = csids[i];
1134 csids[i] = csids[servcount-1-i];
1135 csids[servcount-1-i] = x;
1136 }
1137
1138 // match servers to labels
1139 matching = do_perfect_match(labelcnt,servcount,labelmasks,csids);
1140
1141 if (sclass_mode != SCLASS_MODE_STRICT) {
1142 goodlabelscount = 0;
1143 // extend matching to fulfill goal
1144 for (i=0 ; i<labelcnt ; i++) {
1145 if (matching[i]<0) {
1146 for (j=0 ; j<servcount ; j++) {
1147 if (matching[labelcnt+servcount-j-1]<0) {
1148 matching[i] = labelcnt+servcount-j-1;
1149 matching[labelcnt+servcount-j-1] = i;
1150 break;
1151 }
1152 }
1153 } else {
1154 goodlabelscount++;
1155 }
1156 if (matching[i]<0) { // no more servers
1157 break;
1158 }
1159 }
1160
1161 if (sclass_mode == SCLASS_MODE_STD) {
1162 if (goodlabelscount < labelcnt && goodlabelscount + overloaded >= labelcnt) {
1163 *olflag = 1;
1164 return 0;
1165 }
1166 }
1167
1168 }
1169
1170 // setting servers in proper order
1171 i = 0;
1172 j = servcount-1;
1173 while (i<j) {
1174 while (i<j && matching[labelcnt+i]>=0) {
1175 i++;
1176 }
1177 while (i<j && matching[labelcnt+j]<0) {
1178 j--;
1179 }
1180 if (i<j) {
1181 x = matching[labelcnt+i];
1182 matching[labelcnt+i] = matching[labelcnt+j];
1183 matching[labelcnt+j] = x;
1184 x = csids[i];
1185 csids[i] = csids[j];
1186 csids[j] = x;
1187 }
1188 }
1189 if (sclass_mode == SCLASS_MODE_STRICT) {
1190 if (i < labelcnt && i + overloaded >= labelcnt) {
1191 *olflag = 1;
1192 return 0;
1193 }
1194 return i;
1195 }
1196 }
1197 return servcount;
1198 }
1199
chunk_emergency_increase_version(chunk * c)1200 void chunk_emergency_increase_version(chunk *c) {
1201 slist *s;
1202 uint32_t i;
1203 i=0;
1204 // chunk_remove_disconnected_chunks(c);
1205 for (s=c->slisthead ;s ; s=s->next) {
1206 if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
1207 if (s->valid==TDVALID || s->valid==TDBUSY) {
1208 s->valid = TDBUSY;
1209 } else {
1210 s->valid = BUSY;
1211 }
1212 s->version = c->version+1;
1213 stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
1214 matocsserv_send_setchunkversion(cstab[s->csid].ptr,c->chunkid,c->version+1,c->version);
1215 chunk_addopchunk(s->csid,c->chunkid);
1216 i++;
1217 }
1218 }
1219 if (i>0) { // should always be true !!!
1220 c->interrupted = 0;
1221 c->operation = SET_VERSION;
1222 c->version++;
1223 changelog("%"PRIu32"|SETVERSION(%"PRIu64",%"PRIu32")",(uint32_t)main_time(),c->chunkid,c->version);
1224 } else {
1225 matoclserv_chunk_status(c->chunkid,MFS_ERROR_CHUNKLOST);
1226 }
1227 }
1228
chunk_remove_disconnected_chunks(chunk * c)1229 static inline int chunk_remove_disconnected_chunks(chunk *c) {
1230 uint8_t opfinished,validcopies,disc;
1231 uint8_t verfixed;
1232 slist *s,**st;
1233
1234 if (discservers==NULL && discservers_next==NULL) {
1235 return 0;
1236 }
1237 disc = 0;
1238 st = &(c->slisthead);
1239 while (*st) {
1240 s = *st;
1241 if (!cstab[s->csid].valid) {
1242 if (s->valid==TDBUSY || s->valid==TDVALID) {
1243 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
1244 c->allvalidcopies--;
1245 }
1246 if (s->valid==BUSY || s->valid==VALID) {
1247 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
1248 c->allvalidcopies--;
1249 c->regularvalidcopies--;
1250 }
1251 if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) { // pro forma
1252 matocsserv_write_counters(cstab[s->csid].ptr,0);
1253 }
1254 c->needverincrease = 1;
1255 *st = s->next;
1256 slist_free(s);
1257 disc = 1;
1258 } else {
1259 st = &(s->next);
1260 }
1261 }
1262 if (disc==0) {
1263 return 0;
1264 }
1265 if (c->lockedto<(uint32_t)main_time() && c->slisthead==NULL && c->fcount==0 && c->ondangerlist==0 && chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
1266 changelog("%"PRIu32"|CHUNKDEL(%"PRIu64",%"PRIu32")",main_time(),c->chunkid,c->version);
1267 chunk_delete(c);
1268 return 1;
1269 }
1270 if (c->operation!=NONE) {
1271 validcopies=0;
1272 opfinished=1;
1273 for (s=c->slisthead ; s ; s=s->next) {
1274 if (s->valid==BUSY || s->valid==TDBUSY) {
1275 opfinished=0;
1276 }
1277 if (s->valid==VALID || s->valid==TDVALID) {
1278 validcopies=1;
1279 }
1280 }
1281
1282 if (opfinished && validcopies==0 && (c->operation==SET_VERSION || c->operation==TRUNCATE)) { // we know that version increase was just not completed, so all WVER chunks with version exactly one lower than chunk version are actually VALID copies
1283 verfixed = 0;
1284 for (s=c->slisthead ; s!=NULL ; s=s->next) {
1285 if (s->version+1==c->version) {
1286 if (s->valid==TDWVER) {
1287 verfixed = 1;
1288 s->valid = TDVALID;
1289 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies);
1290 c->allvalidcopies++;
1291 } else if (s->valid==WVER) {
1292 verfixed = 1;
1293 s->valid = VALID;
1294 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
1295 c->allvalidcopies++;
1296 c->regularvalidcopies++;
1297 }
1298 }
1299 if (verfixed) {
1300 c->version--;
1301 changelog("%"PRIu32"|SETVERSION(%"PRIu64",%"PRIu32")",(uint32_t)main_time(),c->chunkid,c->version);
1302 }
1303 }
1304 // we continue because we still want to return status not done to matoclserv module
1305 }
1306
1307 if (opfinished) {
1308 uint8_t nospace,status;
1309 nospace = 1;
1310 for (s=c->slisthead ; s ; s=s->next) {
1311 status = chunk_delopchunk(s->csid,c->chunkid);
1312 if (status!=MFS_ERROR_MISMATCH && status!=MFS_ERROR_NOSPACE) {
1313 nospace = 0;
1314 }
1315 }
1316 if (c->operation==REPLICATE) {
1317 c->operation = NONE;
1318 c->lockedto = 0;
1319 matoclserv_chunk_unlocked(c->chunkid,c);
1320 } else {
1321 if (validcopies) {
1322 chunk_emergency_increase_version(c);
1323 } else {
1324 if (nospace) {
1325 matoclserv_chunk_status(c->chunkid,MFS_ERROR_NOSPACE);
1326 } else {
1327 matoclserv_chunk_status(c->chunkid,MFS_ERROR_NOTDONE);
1328 }
1329 c->operation = NONE;
1330 }
1331 }
1332 } else {
1333 if (c->operation!=REPLICATE) {
1334 c->interrupted = 1;
1335 }
1336 }
1337 }
1338 chunk_priority_queue_check(c,1);
1339 return 0;
1340 }
1341
chunk_mr_increase_version(uint64_t chunkid)1342 int chunk_mr_increase_version(uint64_t chunkid) {
1343 chunk *c;
1344 c = chunk_find(chunkid);
1345 if (c==NULL) {
1346 return MFS_ERROR_NOCHUNK;
1347 }
1348 c->version++;
1349 meta_version_inc();
1350 return MFS_STATUS_OK;
1351 }
1352
1353 /* --- */
1354
chunk_find_sclassid(chunk * c)1355 static inline void chunk_find_sclassid(chunk *c) {
1356 uint32_t i;
1357 uint8_t g;
1358 uint8_t mg;
1359 uint8_t sclassid,v;
1360 mg = 0;
1361 sclassid = 0;
1362 v = 0;
1363 for (i=1 ; i<c->ftab[0] ; i++) {
1364 if (c->ftab[i]>0) {
1365 g = sclass_get_keepmax_goal(i);
1366 if (g>mg) {
1367 mg = g;
1368 sclassid = i;
1369 v = 0;
1370 } else if (g==mg) {
1371 if (sclassid<=9 && v==0) {
1372 sclassid = i;
1373 } else if (sclassid>9 && i>9) {
1374 sclassid = g;
1375 v = 1;
1376 }
1377 }
1378 }
1379 }
1380 massert(sclassid>0,"wrong labels set");
1381 c->sclassid = sclassid;
1382 }
1383
chunk_change_file(uint64_t chunkid,uint8_t prevsclassid,uint8_t newsclassid)1384 int chunk_change_file(uint64_t chunkid,uint8_t prevsclassid,uint8_t newsclassid) {
1385 chunk *c;
1386 uint8_t oldsclassid;
1387
1388 if (prevsclassid==newsclassid) {
1389 return MFS_STATUS_OK;
1390 }
1391 c = chunk_find(chunkid);
1392 if (c==NULL) {
1393 return MFS_ERROR_NOCHUNK;
1394 }
1395 if (c->fcount==0) {
1396 syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016"PRIX64")",c->chunkid);
1397 return MFS_ERROR_CHUNKLOST; // MFS_ERROR_STRUCTURE
1398 }
1399 oldsclassid = c->sclassid;
1400 if (c->fcount==1) {
1401 c->sclassid = newsclassid;
1402 } else {
1403 if (c->ftab==NULL) {
1404 uint32_t ftableng = prevsclassid;
1405 if (newsclassid > ftableng) {
1406 ftableng = newsclassid;
1407 }
1408 ftableng++;
1409 c->ftab = malloc(sizeof(uint32_t)*(ftableng+1));
1410 passert(c->ftab);
1411 memset(c->ftab,0,sizeof(uint32_t)*(ftableng+1));
1412 c->ftab[0] = ftableng+1;
1413 massert(c->sclassid==prevsclassid,"wrong labels set");
1414 c->ftab[prevsclassid] = c->fcount-1;
1415 c->ftab[newsclassid] = 1;
1416 chunk_find_sclassid(c);
1417 } else {
1418 if (newsclassid >= c->ftab[0]) {
1419 c->ftab = realloc(c->ftab,sizeof(uint32_t)*(newsclassid+1));
1420 passert(c->ftab);
1421 memset(c->ftab+c->ftab[0],0,sizeof(uint32_t)*(newsclassid+1-c->ftab[0]));
1422 c->ftab[0] = newsclassid+1;
1423 }
1424 massert(c->ftab[prevsclassid]>0,"wrong ftab entry");
1425 c->ftab[prevsclassid]--;
1426 c->ftab[newsclassid]++;
1427 chunk_find_sclassid(c);
1428 }
1429 }
1430 if (oldsclassid!=c->sclassid) {
1431 chunk_state_change(oldsclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1432 chunk_priority_queue_check(c,1);
1433 } else {
1434 chunk_priority_queue_check(c,0);
1435 }
1436 return MFS_STATUS_OK;
1437 }
1438
chunk_delete_file_int(chunk * c,uint8_t sclassid,uint32_t delete_timeout)1439 static inline int chunk_delete_file_int(chunk *c,uint8_t sclassid,uint32_t delete_timeout) {
1440 uint8_t oldsclassid;
1441
1442 if (c->fcount==0) {
1443 syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016"PRIX64")",c->chunkid);
1444 return MFS_ERROR_CHUNKLOST; // MFS_ERROR_STRUCTURE
1445 }
1446 massert(sclassid>0,"wrong labels set");
1447 oldsclassid = c->sclassid;
1448 if (c->fcount==1) {
1449 c->sclassid = 0;
1450 c->fcount = 0;
1451 //#ifdef METARESTORE
1452 // printf("D%"PRIu64"\n",c->chunkid);
1453 //#endif
1454 } else {
1455 if (c->ftab) {
1456 c->ftab[sclassid]--;
1457 chunk_find_sclassid(c);
1458 }
1459 c->fcount--;
1460 if (c->fcount==1 && c->ftab) {
1461 free(c->ftab);
1462 c->ftab = NULL;
1463 }
1464 }
1465 if (oldsclassid!=c->sclassid) {
1466 chunk_state_change(oldsclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1467 }
1468 if (c->fcount==0 && delete_timeout>0) {
1469 c->lockedto = (uint32_t)main_time()+delete_timeout;
1470 }
1471 return MFS_STATUS_OK;
1472 }
1473
chunk_add_file_int(chunk * c,uint8_t sclassid)1474 static inline int chunk_add_file_int(chunk *c,uint8_t sclassid) {
1475 uint8_t oldsclassid;
1476
1477 massert(sclassid>0,"wrong labels set");
1478 oldsclassid = c->sclassid;
1479 if (c->fcount==0) {
1480 c->sclassid = sclassid;
1481 c->fcount = 1;
1482 } else if (sclassid==c->sclassid) {
1483 c->fcount++;
1484 if (c->ftab) {
1485 c->ftab[sclassid]++;
1486 }
1487 } else {
1488 if (c->ftab==NULL) {
1489 uint32_t ftableng = c->sclassid;
1490 if (sclassid > ftableng) {
1491 ftableng = sclassid;
1492 }
1493 ftableng++;
1494 c->ftab = malloc(sizeof(uint32_t)*(ftableng+1));
1495 passert(c->ftab);
1496 memset(c->ftab,0,sizeof(uint32_t)*(ftableng+1));
1497 c->ftab[0] = ftableng+1;
1498 c->ftab[c->sclassid] = c->fcount;
1499 c->ftab[sclassid] = 1;
1500 c->fcount++;
1501 chunk_find_sclassid(c);
1502 } else {
1503 if (sclassid >= c->ftab[0]) {
1504 c->ftab = realloc(c->ftab,sizeof(uint32_t)*(sclassid+1));
1505 passert(c->ftab);
1506 memset(c->ftab+c->ftab[0],0,sizeof(uint32_t)*(sclassid+1-c->ftab[0]));
1507 c->ftab[0] = sclassid+1;
1508 }
1509 c->ftab[sclassid]++;
1510 c->fcount++;
1511 chunk_find_sclassid(c);
1512 }
1513 }
1514 if (oldsclassid!=c->sclassid) {
1515 chunk_state_change(oldsclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1516 }
1517 return MFS_STATUS_OK;
1518 }
1519
chunk_delete_file(uint64_t chunkid,uint8_t sclassid)1520 int chunk_delete_file(uint64_t chunkid,uint8_t sclassid) {
1521 chunk *c;
1522 c = chunk_find(chunkid);
1523 if (c==NULL) {
1524 return MFS_ERROR_NOCHUNK;
1525 }
1526 return chunk_delete_file_int(c,sclassid,0);
1527 }
1528
chunk_add_file(uint64_t chunkid,uint8_t sclassid)1529 int chunk_add_file(uint64_t chunkid,uint8_t sclassid) {
1530 chunk *c;
1531 c = chunk_find(chunkid);
1532 if (c==NULL) {
1533 return MFS_ERROR_NOCHUNK;
1534 }
1535 return chunk_add_file_int(c,sclassid);
1536 }
1537
chunk_write_counters(chunk * c,uint8_t x)1538 static inline void chunk_write_counters(chunk *c,uint8_t x) {
1539 slist *s;
1540 if (x) {
1541 if (c->writeinprogress==0) {
1542 c->writeinprogress = 1;
1543 } else {
1544 return;
1545 }
1546 } else {
1547 if (c->writeinprogress) {
1548 c->writeinprogress = 0;
1549 } else {
1550 return;
1551 }
1552 }
1553 for (s=c->slisthead ;s ; s=s->next) {
1554 if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
1555 matocsserv_write_counters(cstab[s->csid].ptr,x);
1556 }
1557 }
1558 }
1559
chunk_locked_or_busy(void * cptr)1560 int chunk_locked_or_busy(void *cptr) {
1561 chunk *c = (chunk*)cptr;
1562 return (c->lockedto<(uint32_t)(main_time()) && c->operation==NONE)?0:1;
1563 }
1564
chunk_get_validcopies(uint64_t chunkid,uint8_t * vcopies)1565 int chunk_get_validcopies(uint64_t chunkid,uint8_t *vcopies) {
1566 chunk *c;
1567 *vcopies = 0;
1568 c = chunk_find(chunkid);
1569 if (c==NULL) {
1570 return MFS_ERROR_NOCHUNK;
1571 }
1572 *vcopies = c->allvalidcopies;
1573 return MFS_STATUS_OK;
1574 }
1575
chunk_get_archflag(uint64_t chunkid,uint8_t * archflag)1576 int chunk_get_archflag(uint64_t chunkid,uint8_t *archflag) {
1577 chunk *c;
1578 c = chunk_find(chunkid);
1579 if (c==NULL) {
1580 return MFS_ERROR_NOCHUNK;
1581 }
1582 *archflag = c->archflag;
1583 return MFS_STATUS_OK;
1584 }
1585
chunk_univ_archflag(uint64_t chunkid,uint8_t archflag,uint32_t * archflagchanged)1586 int chunk_univ_archflag(uint64_t chunkid,uint8_t archflag,uint32_t *archflagchanged) {
1587 chunk *c;
1588 c = chunk_find(chunkid);
1589 if (c==NULL) {
1590 return MFS_ERROR_NOCHUNK;
1591 }
1592 if (archflag != c->archflag) {
1593 chunk_state_change(c->sclassid,c->sclassid,c->archflag,archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1594 c->archflag = archflag;
1595 chunk_priority_queue_check(c,1);
1596 (*archflagchanged)++;
1597 }
1598 return MFS_STATUS_OK;
1599 }
1600
1601 // CHUNK_FLOOP_NOTFOUND
1602 // CHUNK_FLOOP_DELETED
1603 // CHUNK_FLOOP_MISSING_NOCOPY
1604 // CHUNK_FLOOP_MISSING_INVALID
1605 // CHUNK_FLOOP_MISSING_WRONGVERSION
1606 // CHUNK_FLOOP_UNDERGOAL_AFLAG_NOT_CHANGED
1607 // CHUNK_FLOOP_UNDERGOAL_AFLAG_CHANGED
1608 // CHUNK_FLOOP_OK_AFLAG_NOT_CHANGED
1609 // CHUNK_FLOOP_OK_AFLAG_CHANGED
chunk_fileloop_task(uint64_t chunkid,uint8_t sclassid,uint8_t aftereof,uint8_t archflag)1610 chunkfloop chunk_fileloop_task(uint64_t chunkid,uint8_t sclassid,uint8_t aftereof,uint8_t archflag) {
1611 chunk *c;
1612 slist *s;
1613 uint8_t aflagchg;
1614 c = chunk_find(chunkid);
1615 if (c==NULL) {
1616 return CHUNK_FLOOP_NOTFOUND;
1617 }
1618 if (c->allvalidcopies==0 && aftereof && c->lockedto<(uint32_t)(main_time()) && c->operation==NONE) {
1619 chunk_delete_file_int(c,sclassid,UNUSED_DELETE_TIMEOUT);
1620 return CHUNK_FLOOP_DELETED;
1621 }
1622 if (c->allvalidcopies==0) {
1623 if (c->slisthead==NULL) {
1624 return CHUNK_FLOOP_MISSING_NOCOPY;
1625 }
1626 for (s=c->slisthead ; s ; s=s->next) {
1627 if (s->valid==WVER || s->valid==TDWVER) {
1628 return CHUNK_FLOOP_MISSING_WRONGVERSION;
1629 }
1630 }
1631 return CHUNK_FLOOP_MISSING_INVALID;
1632 }
1633 if (archflag==1 && c->archflag==0) {
1634 chunk_state_change(c->sclassid,c->sclassid,c->archflag,archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1635 c->archflag = archflag;
1636 chunk_priority_queue_check(c,1);
1637 aflagchg = 1;
1638 } else {
1639 aflagchg = 0;
1640 }
1641 if (c->allvalidcopies < sclass_get_keeparch_goal(c->sclassid,c->archflag)) {
1642 return aflagchg?CHUNK_FLOOP_UNDERGOAL_AFLAG_CHANGED:CHUNK_FLOOP_UNDERGOAL_AFLAG_NOT_CHANGED;
1643 }
1644 return aflagchg?CHUNK_FLOOP_OK_AFLAG_CHANGED:CHUNK_FLOOP_OK_AFLAG_NOT_CHANGED;
1645 }
1646
chunk_read_check(uint32_t ts,uint64_t chunkid)1647 int chunk_read_check(uint32_t ts,uint64_t chunkid) {
1648 chunk *c;
1649 c = chunk_find(chunkid);
1650 if (c==NULL) {
1651 return MFS_ERROR_NOCHUNK;
1652 }
1653 if (c->lockedto>=ts) {
1654 return MFS_ERROR_LOCKED;
1655 }
1656 if (c->operation != NONE) {
1657 return MFS_ERROR_CHUNKBUSY;
1658 }
1659 return MFS_STATUS_OK;
1660 }
1661
chunk_univ_multi_modify(uint32_t ts,uint8_t mr,uint8_t continueop,uint64_t * nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t * opflag,uint32_t clientip)1662 int chunk_univ_multi_modify(uint32_t ts,uint8_t mr,uint8_t continueop,uint64_t *nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t *opflag,uint32_t clientip) {
1663 uint16_t csids[MAXCSCOUNT];
1664 static void **chosen = NULL;
1665 static uint32_t chosenleng = 0;
1666 uint16_t servcount=0;
1667 uint32_t vc;
1668 uint8_t overloaded;
1669 slist *os,*s;
1670 uint32_t i;
1671 chunk *oc,*c;
1672 uint8_t csstable,csalldata;
1673 uint8_t cschanges;
1674
1675 if (ts>(starttime+10) && csregisterinprogress==0) {
1676 csstable = 1;
1677 } else {
1678 csstable = 0;
1679 }
1680
1681 cschanges = (csstable==0 || (csreceivingchunks&2))?1:0;
1682
1683 if (chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
1684 csalldata = 1;
1685 } else {
1686 csalldata = 0;
1687 }
1688
1689 if (ochunkid==0) { // new chunk
1690 if (mr==0) {
1691 servcount = chunk_creation_servers(csids,sclassid,&overloaded,clientip);
1692 if (servcount==0) {
1693 if (overloaded || csalldata==0) {
1694 return MFS_ERROR_EAGAIN; // try again for ever
1695 } else {
1696 uint16_t scount;
1697 scount = matocsserv_servers_count();
1698 if (scount>0 && csstable) {
1699 return MFS_ERROR_NOSPACE; // return error
1700 } else {
1701 return MFS_ERROR_NOCHUNKSERVERS; // try again
1702 }
1703 }
1704 }
1705 c = chunk_new(nextchunkid++);
1706 c->version = 1;
1707 c->interrupted = 0;
1708 c->operation = CREATE;
1709 chunk_add_file_int(c,sclassid);
1710 if (servcount<sclass_get_create_goal(sclassid)) {
1711 c->allvalidcopies = servcount;
1712 c->regularvalidcopies = servcount;
1713 } else {
1714 c->allvalidcopies = sclass_get_create_goal(sclassid);
1715 c->regularvalidcopies = sclass_get_create_goal(sclassid);
1716 }
1717 if (c->allvalidcopies>chosenleng) {
1718 chosenleng = c->allvalidcopies+10;
1719 chosen = malloc(sizeof(void*)*chosenleng);
1720 passert(chosen);
1721 }
1722 for (i=0 ; i<c->allvalidcopies ; i++) {
1723 s = slist_malloc();
1724 s->csid = csids[i];
1725 s->valid = BUSY;
1726 s->version = c->version;
1727 s->next = c->slisthead;
1728 c->slisthead = s;
1729 chosen[i] = cstab[s->csid].ptr;
1730 stats_chunkops[CHUNK_OP_CREATE_TRY]++;
1731 matocsserv_send_createchunk(cstab[s->csid].ptr,c->chunkid,c->version);
1732 chunk_addopchunk(s->csid,c->chunkid);
1733 }
1734 matocsserv_useservers_wrandom(chosen,c->allvalidcopies);
1735 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
1736 *opflag=1;
1737 *nchunkid = c->chunkid;
1738 } else {
1739 if (*nchunkid != nextchunkid) {
1740 return MFS_ERROR_MISMATCH;
1741 }
1742 c = chunk_new(nextchunkid++);
1743 c->version = 1;
1744 chunk_add_file_int(c,sclassid);
1745 }
1746 } else {
1747 c = NULL;
1748 oc = chunk_find(ochunkid);
1749 if (oc && mr==0) {
1750 if (chunk_remove_disconnected_chunks(oc)) {
1751 oc = NULL;
1752 }
1753 }
1754 if (oc==NULL) {
1755 return MFS_ERROR_NOCHUNK;
1756 }
1757 if (mr==0 && oc->lockedto>=ts && continueop==0) {
1758 return MFS_ERROR_LOCKED;
1759 }
1760 if (oc->fcount==1) {
1761 c = oc;
1762 if (mr==0) {
1763 *nchunkid = ochunkid;
1764 if (c->operation!=NONE) {
1765 return MFS_ERROR_CHUNKBUSY;
1766 }
1767 if (cschanges) {
1768 vc = 0;
1769 for (s=c->slisthead ; s ; s=s->next) {
1770 if (s->valid==VALID) {
1771 vc++;
1772 }
1773 }
1774 if (vc < sclass_get_keeparch_goal(c->sclassid,c->archflag)) {
1775 return MFS_ERROR_EAGAIN; // just try again later
1776 }
1777 }
1778 if (c->needverincrease) {
1779 i=0;
1780 for (s=c->slisthead ;s ; s=s->next) {
1781 if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
1782 if (s->valid==TDVALID || s->valid==TDBUSY) {
1783 s->valid = TDBUSY;
1784 } else {
1785 s->valid = BUSY;
1786 }
1787 s->version = c->version+1;
1788 stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
1789 matocsserv_send_setchunkversion(cstab[s->csid].ptr,ochunkid,c->version+1,c->version);
1790 chunk_addopchunk(s->csid,c->chunkid);
1791 i++;
1792 }
1793 }
1794 if (i>0) {
1795 c->interrupted = 0;
1796 c->operation = SET_VERSION;
1797 c->version++;
1798 *opflag = 1;
1799 } else {
1800 if (csalldata) {
1801 return MFS_ERROR_CHUNKLOST; // return error
1802 } else {
1803 return MFS_ERROR_CSNOTPRESENT; // try again
1804 }
1805 }
1806 } else {
1807 *opflag = 0;
1808 }
1809 } else {
1810 if (*nchunkid != ochunkid) {
1811 return MFS_ERROR_MISMATCH;
1812 }
1813 if (*opflag) {
1814 c->version++;
1815 }
1816 }
1817 } else {
1818 if (oc->fcount==0) { // it's serious structure error
1819 if (mr==0) {
1820 syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016"PRIX64")",ochunkid);
1821 } else {
1822 printf("serious structure inconsistency: (chunkid:%016"PRIX64")\n",ochunkid);
1823 }
1824 return MFS_ERROR_CHUNKLOST; // MFS_ERROR_STRUCTURE
1825 }
1826 if (mr==0) {
1827 if (oc->operation!=NONE) {
1828 return MFS_ERROR_CHUNKBUSY;
1829 }
1830 if (cschanges) {
1831 vc = 0;
1832 for (os=oc->slisthead ; os ; os=os->next) {
1833 if (os->valid==VALID) {
1834 vc++;
1835 }
1836 }
1837 if (vc < sclass_get_keeparch_goal(oc->sclassid,oc->archflag)) {
1838 return MFS_ERROR_EAGAIN; // just try again later
1839 }
1840 }
1841 i=0;
1842 for (os=oc->slisthead ;os ; os=os->next) {
1843 if (os->valid!=INVALID && os->valid!=DEL && os->valid!=WVER && os->valid!=TDWVER) {
1844 if (c==NULL) {
1845 c = chunk_new(nextchunkid++);
1846 c->version = 1;
1847 c->interrupted = 0;
1848 c->operation = DUPLICATE;
1849 chunk_delete_file_int(oc,sclassid,0);
1850 chunk_add_file_int(c,sclassid);
1851 }
1852 s = slist_malloc();
1853 s->csid = os->csid;
1854 s->valid = BUSY;
1855 s->version = c->version;
1856 s->next = c->slisthead;
1857 c->slisthead = s;
1858 c->allvalidcopies++;
1859 c->regularvalidcopies++;
1860 stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
1861 matocsserv_send_duplicatechunk(cstab[s->csid].ptr,c->chunkid,c->version,oc->chunkid,oc->version);
1862 chunk_addopchunk(s->csid,c->chunkid);
1863 i++;
1864 }
1865 }
1866 if (c!=NULL) {
1867 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
1868 }
1869 if (i>0) {
1870 *nchunkid = c->chunkid;
1871 *opflag=1;
1872 } else {
1873 if (csalldata) {
1874 return MFS_ERROR_CHUNKLOST; // return error
1875 } else {
1876 return MFS_ERROR_CSNOTPRESENT; // try again
1877 }
1878 }
1879 } else {
1880 if (*nchunkid != nextchunkid) {
1881 return MFS_ERROR_MISMATCH;
1882 }
1883 c = chunk_new(nextchunkid++);
1884 c->version = 1;
1885 chunk_delete_file_int(oc,sclassid,0);
1886 chunk_add_file_int(c,sclassid);
1887 *nchunkid = c->chunkid;
1888 }
1889 }
1890 }
1891
1892 c->lockedto = ts+LOCKTIMEOUT;
1893 chunk_write_counters(c,1);
1894 return MFS_STATUS_OK;
1895 }
1896
chunk_multi_modify(uint8_t continueop,uint64_t * nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t * opflag,uint32_t clientip)1897 int chunk_multi_modify(uint8_t continueop,uint64_t *nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t *opflag,uint32_t clientip) {
1898 return chunk_univ_multi_modify(main_time(),0,continueop,nchunkid,ochunkid,sclassid,opflag,clientip);
1899 }
1900
chunk_mr_multi_modify(uint32_t ts,uint64_t * nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t opflag)1901 int chunk_mr_multi_modify(uint32_t ts,uint64_t *nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t opflag) {
1902 return chunk_univ_multi_modify(ts,1,0,nchunkid,ochunkid,sclassid,&opflag,0);
1903 }
1904
chunk_univ_multi_truncate(uint32_t ts,uint8_t mr,uint64_t * nchunkid,uint64_t ochunkid,uint32_t length,uint8_t sclassid)1905 int chunk_univ_multi_truncate(uint32_t ts,uint8_t mr,uint64_t *nchunkid,uint64_t ochunkid,uint32_t length,uint8_t sclassid) {
1906 slist *os,*s;
1907 uint32_t i;
1908 chunk *oc,*c;
1909 uint8_t csstable,csalldata;
1910 uint8_t cschanges;
1911 uint32_t vc;
1912
1913 if (ts>(starttime+10) && csregisterinprogress==0) {
1914 csstable = 1;
1915 } else {
1916 csstable = 0;
1917 }
1918
1919 cschanges = (csstable==0 || (csreceivingchunks&2))?1:0;
1920
1921 if (chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
1922 csalldata = 1;
1923 } else {
1924 csalldata = 0;
1925 }
1926
1927 c=NULL;
1928 oc = chunk_find(ochunkid);
1929 if (oc && mr==0) {
1930 if (chunk_remove_disconnected_chunks(oc)) {
1931 oc = NULL;
1932 }
1933 }
1934
1935 if (oc==NULL) {
1936 return MFS_ERROR_NOCHUNK;
1937 }
1938 if (mr==0 && oc->lockedto>=ts) {
1939 return MFS_ERROR_LOCKED;
1940 }
1941 if (oc->fcount==1) {
1942 c = oc;
1943 if (mr==0) {
1944 *nchunkid = ochunkid;
1945 if (c->operation!=NONE) {
1946 return MFS_ERROR_CHUNKBUSY;
1947 }
1948 if (cschanges) {
1949 vc = 0;
1950 for (os=oc->slisthead ; os ; os=os->next) {
1951 if (os->valid==VALID) {
1952 vc++;
1953 }
1954 }
1955 if (vc < sclass_get_keeparch_goal(oc->sclassid,oc->archflag)) {
1956 return MFS_ERROR_EAGAIN; // just try again later
1957 }
1958 }
1959 i=0;
1960 for (s=c->slisthead ;s ; s=s->next) {
1961 if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
1962 if (s->valid==TDVALID || s->valid==TDBUSY) {
1963 s->valid = TDBUSY;
1964 } else {
1965 s->valid = BUSY;
1966 }
1967 s->version = c->version+1;
1968 stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
1969 matocsserv_send_truncatechunk(cstab[s->csid].ptr,ochunkid,length,c->version+1,c->version);
1970 chunk_addopchunk(s->csid,c->chunkid);
1971 i++;
1972 }
1973 }
1974 if (i>0) {
1975 c->interrupted = 0;
1976 c->operation = TRUNCATE;
1977 c->version++;
1978 } else {
1979 if (csalldata) {
1980 return MFS_ERROR_CHUNKLOST; // return error
1981 } else {
1982 return MFS_ERROR_CSNOTPRESENT; // try again
1983 }
1984 }
1985 } else {
1986 if (*nchunkid != ochunkid) {
1987 return MFS_ERROR_MISMATCH;
1988 }
1989 c->version++;
1990 }
1991 } else {
1992 if (oc->fcount==0) { // it's serious structure error
1993 if (mr==0) {
1994 syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016"PRIX64")",ochunkid);
1995 } else {
1996 printf("serious structure inconsistency: (chunkid:%016"PRIX64")\n",ochunkid);
1997 }
1998 return MFS_ERROR_CHUNKLOST; // MFS_ERROR_STRUCTURE
1999 }
2000 if (mr==0) {
2001 if (oc->operation!=NONE) {
2002 return MFS_ERROR_CHUNKBUSY;
2003 }
2004 if (cschanges) {
2005 vc = 0;
2006 for (os=oc->slisthead ; os ; os=os->next) {
2007 if (os->valid==VALID) {
2008 vc++;
2009 }
2010 }
2011 if (vc < sclass_get_keeparch_goal(oc->sclassid,oc->archflag)) {
2012 return MFS_ERROR_EAGAIN; // just try again later
2013 }
2014 }
2015 i=0;
2016 for (os=oc->slisthead ;os ; os=os->next) {
2017 if (os->valid!=INVALID && os->valid!=DEL && os->valid!=WVER && os->valid!=TDWVER) {
2018 if (c==NULL) {
2019 c = chunk_new(nextchunkid++);
2020 c->version = 1;
2021 c->interrupted = 0;
2022 c->operation = DUPTRUNC;
2023 chunk_delete_file_int(oc,sclassid,0);
2024 chunk_add_file_int(c,sclassid);
2025 }
2026 s = slist_malloc();
2027 s->csid = os->csid;
2028 s->valid = BUSY;
2029 s->version = c->version;
2030 s->next = c->slisthead;
2031 c->slisthead = s;
2032 c->allvalidcopies++;
2033 c->regularvalidcopies++;
2034 stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
2035 matocsserv_send_duptruncchunk(cstab[s->csid].ptr,c->chunkid,c->version,oc->chunkid,oc->version,length);
2036 chunk_addopchunk(s->csid,c->chunkid);
2037 i++;
2038 }
2039 }
2040 if (c!=NULL) {
2041 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
2042 }
2043 if (i>0) {
2044 *nchunkid = c->chunkid;
2045 } else {
2046 if (csalldata) {
2047 return MFS_ERROR_CHUNKLOST; // return error
2048 } else {
2049 return MFS_ERROR_CSNOTPRESENT; // try again
2050 }
2051 }
2052 } else {
2053 if (*nchunkid != nextchunkid) {
2054 return MFS_ERROR_MISMATCH;
2055 }
2056 c = chunk_new(nextchunkid++);
2057 c->version = 1;
2058 chunk_delete_file_int(oc,sclassid,0);
2059 chunk_add_file_int(c,sclassid);
2060 *nchunkid = c->chunkid;
2061 }
2062 }
2063
2064 c->lockedto=ts+LOCKTIMEOUT;
2065 return MFS_STATUS_OK;
2066 }
2067
chunk_multi_truncate(uint64_t * nchunkid,uint64_t ochunkid,uint32_t length,uint8_t sclassid)2068 int chunk_multi_truncate(uint64_t *nchunkid,uint64_t ochunkid,uint32_t length,uint8_t sclassid) {
2069 return chunk_univ_multi_truncate(main_time(),0,nchunkid,ochunkid,length,sclassid);
2070 }
2071
chunk_mr_multi_truncate(uint32_t ts,uint64_t * nchunkid,uint64_t ochunkid,uint8_t sclassid)2072 int chunk_mr_multi_truncate(uint32_t ts,uint64_t *nchunkid,uint64_t ochunkid,uint8_t sclassid) {
2073 return chunk_univ_multi_truncate(ts,1,nchunkid,ochunkid,0,sclassid);
2074 }
2075
chunk_repair(uint8_t sclassid,uint64_t ochunkid,uint32_t * nversion)2076 int chunk_repair(uint8_t sclassid,uint64_t ochunkid,uint32_t *nversion) {
2077 uint32_t bestversion;
2078 chunk *c;
2079 slist *s;
2080
2081 *nversion=0;
2082 if (ochunkid==0) {
2083 return 0; // not changed
2084 }
2085
2086 c = chunk_find(ochunkid);
2087 if (c==NULL) { // no such chunk - erase (nchunkid already is 0 - so just return with "changed" status)
2088 return 1;
2089 }
2090 if (c->lockedto>=(uint32_t)main_time()) { // can't repair locked chunks - but if it's locked, then likely it doesn't need to be repaired
2091 return 0;
2092 }
2093 chunk_write_counters(c,0);
2094 bestversion = 0;
2095 for (s=c->slisthead ; s ; s=s->next) {
2096 if (cstab[s->csid].valid) {
2097 if (s->valid == VALID || s->valid == TDVALID || s->valid == BUSY || s->valid == TDBUSY) { // found chunk that is ok - so return
2098 return 0;
2099 }
2100 if (s->valid == WVER || s->valid == TDWVER) {
2101 if (s->version>=bestversion) {
2102 bestversion = s->version;
2103 }
2104 }
2105 }
2106 }
2107 if (bestversion==0) { // didn't find sensible chunk - so erase it
2108 chunk_delete_file_int(c,sclassid,0);
2109 return 1;
2110 }
2111 if (c->allvalidcopies>0 || c->regularvalidcopies>0) {
2112 if (c->allvalidcopies>0) {
2113 syslog(LOG_WARNING,"wrong all valid copies counter - (counter value: %u, should be: 0) - fixed",c->allvalidcopies);
2114 }
2115 if (c->regularvalidcopies>0) {
2116 syslog(LOG_WARNING,"wrong regular valid copies counter - (counter value: %u, should be: 0) - fixed",c->regularvalidcopies);
2117 }
2118 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,0,c->regularvalidcopies,0);
2119 c->allvalidcopies = 0;
2120 c->regularvalidcopies = 0;
2121 }
2122 c->version = bestversion;
2123 for (s=c->slisthead ; s ; s=s->next) {
2124 if (s->version==bestversion && cstab[s->csid].valid) {
2125 if (s->valid==WVER) {
2126 s->valid = VALID;
2127 c->allvalidcopies++;
2128 c->regularvalidcopies++;
2129 } else if (s->valid==TDWVER) {
2130 s->valid = TDVALID;
2131 c->allvalidcopies++;
2132 }
2133 }
2134 }
2135 *nversion = bestversion;
2136 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
2137 c->needverincrease = 1;
2138 return 1;
2139 }
2140
chunk_mr_set_version(uint64_t chunkid,uint32_t version)2141 int chunk_mr_set_version(uint64_t chunkid,uint32_t version) {
2142 chunk *c;
2143 c = chunk_find(chunkid);
2144 if (c==NULL) {
2145 return MFS_ERROR_NOCHUNK;
2146 }
2147 c->version = version;
2148 meta_version_inc();
2149 return MFS_STATUS_OK;
2150 }
2151
2152 /* ---- */
2153
2154 typedef struct locsort {
2155 uint32_t ip;
2156 uint16_t port;
2157 uint32_t csver;
2158 uint32_t labelmask;
2159 uint32_t dist;
2160 uint32_t rnd;
2161 } locsort;
2162
chunk_locsort_cmp(const void * aa,const void * bb)2163 int chunk_locsort_cmp(const void *aa,const void *bb) {
2164 const locsort *a = (const locsort*)aa;
2165 const locsort *b = (const locsort*)bb;
2166 if (a->dist<b->dist) {
2167 return -1;
2168 } else if (a->dist>b->dist) {
2169 return 1;
2170 } else if (a->rnd<b->rnd) {
2171 return -1;
2172 } else if (a->rnd>b->rnd) {
2173 return 1;
2174 }
2175 return 0;
2176 }
2177
chunk_get_version_and_csdata(uint8_t mode,uint64_t chunkid,uint32_t cuip,uint32_t * version,uint8_t * count,uint8_t cs_data[100* 14])2178 uint8_t chunk_get_version_and_csdata(uint8_t mode,uint64_t chunkid,uint32_t cuip,uint32_t *version,uint8_t *count,uint8_t cs_data[100*14]) {
2179 chunk *c;
2180 slist *s;
2181 uint8_t i;
2182 uint8_t cnt;
2183 uint8_t *wptr;
2184 locsort lstab[100];
2185
2186 c = chunk_find(chunkid);
2187 if (c==NULL) {
2188 return MFS_ERROR_NOCHUNK;
2189 }
2190 *version = c->version;
2191 cnt=0;
2192 for (s=c->slisthead ;s ; s=s->next) {
2193 if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER && cstab[s->csid].valid) {
2194 if (cnt<100 && matocsserv_get_csdata(cstab[s->csid].ptr,&(lstab[cnt].ip),&(lstab[cnt].port),&(lstab[cnt].csver),&(lstab[cnt].labelmask))==0) {
2195 lstab[cnt].dist = topology_distance(lstab[cnt].ip,cuip); // in the future prepare more sophisticated distance function
2196 lstab[cnt].rnd = rndu32();
2197 cnt++;
2198 }
2199 }
2200 }
2201 if (cnt==0) {
2202 *count = 0;
2203 if (chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
2204 return MFS_ERROR_CHUNKLOST; // this is permanent state - chunk is definitely lost
2205 } else {
2206 return MFS_STATUS_OK;
2207 }
2208 }
2209 qsort(lstab,cnt,sizeof(locsort),chunk_locsort_cmp);
2210 wptr = cs_data;
2211 for (i=0 ; i<cnt ; i++) {
2212 put32bit(&wptr,lstab[i].ip);
2213 put16bit(&wptr,lstab[i].port);
2214 if (mode>0) {
2215 put32bit(&wptr,lstab[i].csver);
2216 }
2217 if (mode>1) {
2218 put32bit(&wptr,lstab[i].labelmask);
2219 }
2220 }
2221 *count = cnt;
2222 return MFS_STATUS_OK;
2223 }
2224
chunk_get_copies(uint64_t chunkid,uint8_t * count)2225 uint8_t chunk_get_copies(uint64_t chunkid,uint8_t *count) {
2226 chunk *c;
2227 slist *s;
2228 uint8_t cnt;
2229 uint32_t ip;
2230 uint16_t port;
2231
2232 c = chunk_find(chunkid);
2233 if (c==NULL) {
2234 return MFS_ERROR_NOCHUNK;
2235 }
2236 cnt = 0;
2237 for (s=c->slisthead ; s && cnt<100 ; s=s->next) {
2238 if (cstab[s->csid].valid && s->valid!=DEL) {
2239 if (matocsserv_get_csdata(cstab[s->csid].ptr,&ip,&port,NULL,NULL)==0) {
2240 cnt++;
2241 }
2242 }
2243 }
2244 *count = cnt;
2245 return MFS_STATUS_OK;
2246 }
2247
chunk_get_version(uint64_t chunkid,uint32_t * version)2248 uint8_t chunk_get_version(uint64_t chunkid,uint32_t *version) {
2249 chunk *c;
2250 c = chunk_find(chunkid);
2251
2252 if (c==NULL) {
2253 *version = 0;
2254 return MFS_ERROR_NOCHUNK;
2255 }
2256 *version = c->version;
2257 return MFS_STATUS_OK;
2258 }
2259
chunk_get_version_and_copies(uint64_t chunkid,uint32_t * version,uint8_t * count,uint8_t cs_data[100* 7])2260 uint8_t chunk_get_version_and_copies(uint64_t chunkid,uint32_t *version,uint8_t *count,uint8_t cs_data[100*7]) {
2261 chunk *c;
2262 slist *s;
2263 uint8_t cnt;
2264 uint32_t ip;
2265 uint16_t port;
2266 uint8_t *wptr;
2267
2268 c = chunk_find(chunkid);
2269 if (c==NULL) {
2270 return MFS_ERROR_NOCHUNK;
2271 }
2272 *version = c->version;
2273 cnt=0;
2274 wptr = cs_data;
2275
2276 for (s=c->slisthead ; s && cnt<100 ; s=s->next) {
2277 if (cstab[s->csid].valid && s->valid!=DEL) {
2278 if (matocsserv_get_csdata(cstab[s->csid].ptr,&ip,&port,NULL,NULL)==0) {
2279 put32bit(&wptr,ip);
2280 put16bit(&wptr,port);
2281 if (s->valid==VALID || s->valid==BUSY) {
2282 put8bit(&wptr,CHECK_VALID);
2283 } else if (s->valid==TDVALID || s->valid==TDBUSY) {
2284 put8bit(&wptr,CHECK_MARKEDFORREMOVAL);
2285 } else if (s->valid==WVER) {
2286 put8bit(&wptr,CHECK_WRONGVERSION);
2287 } else if (s->valid==TDWVER) {
2288 put8bit(&wptr,CHECK_WV_AND_MFR);
2289 } else {
2290 put8bit(&wptr,CHECK_INVALID);
2291 }
2292 cnt++;
2293 }
2294 }
2295 }
2296 *count = cnt;
2297 return MFS_STATUS_OK;
2298 }
2299
2300 /* ---- */
2301
chunk_mr_nextchunkid(uint64_t nchunkid)2302 int chunk_mr_nextchunkid(uint64_t nchunkid) {
2303 if (nchunkid>nextchunkid) {
2304 nextchunkid=nchunkid;
2305 meta_version_inc();
2306 return MFS_STATUS_OK;
2307 } else {
2308 return MFS_ERROR_MISMATCH;
2309 }
2310 }
2311
chunk_mr_chunkadd(uint32_t ts,uint64_t chunkid,uint32_t version,uint32_t lockedto)2312 int chunk_mr_chunkadd(uint32_t ts,uint64_t chunkid,uint32_t version,uint32_t lockedto) {
2313 chunk *c;
2314 c = chunk_find(chunkid);
2315 if (c) {
2316 return MFS_ERROR_CHUNKEXIST;
2317 }
2318 if (chunkid>nextchunkid+UINT64_C(1000000000)) {
2319 return MFS_ERROR_MISMATCH;
2320 }
2321 if (chunkid>=nextchunkid) {
2322 nextchunkid=chunkid+1;
2323 }
2324 if (lockedto>0 && lockedto<ts) {
2325 return MFS_ERROR_MISMATCH;
2326 }
2327 c = chunk_new(chunkid);
2328 c->version = version;
2329 c->lockedto = lockedto;
2330 meta_version_inc();
2331 return MFS_STATUS_OK;
2332 }
2333
chunk_mr_chunkdel(uint32_t ts,uint64_t chunkid,uint32_t version)2334 int chunk_mr_chunkdel(uint32_t ts,uint64_t chunkid,uint32_t version) {
2335 chunk *c;
2336 c = chunk_find(chunkid);
2337 if (c==NULL) {
2338 return MFS_ERROR_NOCHUNK;
2339 }
2340 if (c->version != version) {
2341 return MFS_ERROR_WRONGVERSION;
2342 }
2343 if (c->fcount!=0) {
2344 return MFS_ERROR_ACTIVE;
2345 }
2346 if (c->slisthead!=NULL) {
2347 return MFS_ERROR_CHUNKBUSY;
2348 }
2349 if (c->lockedto>=ts) {
2350 return MFS_ERROR_LOCKED;
2351 }
2352 chunk_delete(c);
2353 meta_version_inc();
2354 return MFS_STATUS_OK;
2355 }
2356
chunk_mfr_state_check(chunk * c)2357 static inline void chunk_mfr_state_check(chunk *c) {
2358 slist *s;
2359 uint8_t goal,vc,tdc;
2360 goal = sclass_get_keeparch_goal(c->sclassid,c->archflag);
2361 vc = 0;
2362 tdc = 0;
2363 for (s=c->slisthead ; s ; s=s->next) {
2364 if (s->valid==VALID || s->valid==BUSY) {
2365 vc++;
2366 } else if (s->valid==TDVALID || s->valid==TDBUSY) {
2367 tdc++;
2368 }
2369 }
2370 if (vc < goal && tdc > 0) {
2371 for (s=c->slisthead ; s ; s=s->next) {
2372 if (s->valid==TDVALID || s->valid==TDBUSY) {
2373 cstab[s->csid].mfr_state = REPL_IN_PROGRESS;
2374 }
2375 }
2376 }
2377 }
2378
chunk_server_has_chunk(uint16_t csid,uint64_t chunkid,uint32_t version)2379 void chunk_server_has_chunk(uint16_t csid,uint64_t chunkid,uint32_t version) {
2380 chunk *c;
2381 slist *s;
2382 #ifndef MFSDEBUG
2383 static uint32_t loglastts = 0;
2384 static uint32_t ilogcount = 0;
2385 static uint32_t clogcount = 0;
2386 #else
2387 uint8_t debug;
2388 #endif
2389
2390 cstab[csid].newchunkdelay = NEWCHUNKDELAY;
2391 csreceivingchunks |= 2;
2392
2393 c = chunk_find(chunkid);
2394 if (c) {
2395 if (chunk_remove_disconnected_chunks(c)) {
2396 c = NULL;
2397 }
2398 }
2399
2400 if (c==NULL) {
2401 #ifndef MFSDEBUG
2402 if (loglastts+60<main_time()) {
2403 ilogcount=0;
2404 clogcount=0;
2405 loglastts = main_time();
2406 }
2407 #endif
2408 if (chunkid==0 || chunkid>nextchunkid+UINT64_C(1000000000)) {
2409 #ifndef MFSDEBUG
2410 if (ilogcount<10) {
2411 #endif
2412 syslog(LOG_WARNING,"chunkserver (%s) has nonexistent chunk (%016"PRIX64"_%08"PRIX32"), id looks wrong - just ignore it",matocsserv_getstrip(cstab[csid].ptr),chunkid,(version&0x7FFFFFFF));
2413 #ifndef MFSDEBUG
2414 } else if (ilogcount==10) {
2415 syslog(LOG_WARNING,"there are more nonexistent chunks to ignore - stop logging");
2416 }
2417 ilogcount++;
2418 #endif
2419 return;
2420 }
2421 #ifndef MFSDEBUG
2422 if (clogcount<10) {
2423 #endif
2424 syslog(LOG_WARNING,"chunkserver (%s) has nonexistent chunk (%016"PRIX64"_%08"PRIX32"), so create it for future deletion",matocsserv_getstrip(cstab[csid].ptr),chunkid,(version&0x7FFFFFFF));
2425 #ifndef MFSDEBUG
2426 } else if (clogcount==10) {
2427 syslog(LOG_WARNING,"there are more nonexistent chunks to create - stop logging");
2428 }
2429 clogcount++;
2430 #endif
2431 if (chunkid>=nextchunkid) {
2432 nextchunkid=chunkid+1;
2433 // changelog("%"PRIu32"|NEXTCHUNKID(%"PRIu64")",main_time(),nextchunkid);
2434 }
2435 c = chunk_new(chunkid);
2436 c->version = (version&0x7FFFFFFF);
2437 c->lockedto = (uint32_t)main_time()+UNUSED_DELETE_TIMEOUT;
2438 changelog("%"PRIu32"|CHUNKADD(%"PRIu64",%"PRIu32",%"PRIu32")",main_time(),c->chunkid,c->version,c->lockedto);
2439 }
2440 #ifdef MFSDEBUG
2441 debug = ((chunkid&0xFFF)==0)?1:0;
2442 if (debug) {
2443 mfsdebug("chunk_server_has_chunk ; chunkid=%016"PRIX64" new copy: mfrstatus=%u ; csid=%"PRIu16" ; ip=%s",chunkid,(version&0x80000000)?1:0,csid,matocsserv_getstrip(cstab[csid].ptr));
2444 for (s=c->slisthead ; s ; s=s->next) {
2445 mfsdebug("chunk_server_has_chunk ; chunkid=%016"PRIX64" ; existing copy: mfrstatus=%u ; valid=%s ; csid=%"PRIu16" ; ip=%s",chunkid,(s->valid==TDVALID || s->valid==TDBUSY || s->valid==TDWVER)?1:0,validstr[s->valid],s->csid,matocsserv_getstrip(cstab[s->csid].ptr));
2446 }
2447 }
2448 #endif
2449 for (s=c->slisthead ; s ; s=s->next) {
2450 if (s->csid==csid) {
2451 uint8_t nextallvalidcopies = c->allvalidcopies;
2452 uint8_t nextregularvalidcopies = c->regularvalidcopies;
2453 uint8_t nextvalid;
2454 if (s->valid==INVALID || s->valid==DEL || s->valid==WVER || s->valid==TDWVER) { // ignore such copy
2455 return;
2456 }
2457 // in case of other copies just check 'mark for removal' status
2458 if (s->valid==BUSY || s->valid==TDBUSY) {
2459 if (version&0x80000000) {
2460 nextvalid = TDBUSY;
2461 } else {
2462 nextvalid = BUSY;
2463 }
2464 } else {
2465 if (version&0x80000000) {
2466 nextvalid = TDVALID;
2467 } else {
2468 nextvalid = VALID;
2469 }
2470 }
2471 if (s->valid==nextvalid) {
2472 return;
2473 }
2474 if (s->valid==VALID || s->valid==BUSY) {
2475 nextallvalidcopies--;
2476 nextregularvalidcopies--;
2477 } else if (s->valid==TDVALID || s->valid==TDBUSY) {
2478 nextallvalidcopies--;
2479 }
2480 if (nextvalid==VALID || nextvalid==BUSY) {
2481 nextallvalidcopies++;
2482 nextregularvalidcopies++;
2483 } else if (nextvalid==TDVALID || nextvalid==TDBUSY) {
2484 nextallvalidcopies++;
2485 }
2486 s->valid = nextvalid;
2487 if (nextallvalidcopies!=c->allvalidcopies || nextregularvalidcopies!=c->regularvalidcopies) {
2488 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,nextallvalidcopies,c->regularvalidcopies,nextregularvalidcopies);
2489 c->allvalidcopies = nextallvalidcopies;
2490 c->regularvalidcopies = nextregularvalidcopies;
2491 if (version&0x80000000) {
2492 chunk_mfr_state_check(c);
2493 }
2494 }
2495 return;
2496 }
2497 }
2498 s = slist_malloc();
2499 s->csid = csid;
2500 if (c->version!=(version&0x7FFFFFFF)) {
2501 if (version&0x80000000) {
2502 s->valid = TDWVER;
2503 } else {
2504 s->valid = WVER;
2505 }
2506 s->version = version&0x7FFFFFFF;
2507 } else {
2508 if (c->writeinprogress) {
2509 matocsserv_write_counters(cstab[csid].ptr,1);
2510 }
2511 if (version&0x80000000) {
2512 s->valid = TDVALID;
2513 s->version = c->version;
2514 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies);
2515 c->allvalidcopies++;
2516 } else {
2517 s->valid = VALID;
2518 s->version = c->version;
2519 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
2520 c->allvalidcopies++;
2521 c->regularvalidcopies++;
2522 }
2523 }
2524 s->next = c->slisthead;
2525 c->slisthead = s;
2526 c->needverincrease = 1;
2527 if (version&0x80000000) {
2528 chunk_mfr_state_check(c);
2529 }
2530 }
2531
chunk_damaged(uint16_t csid,uint64_t chunkid)2532 void chunk_damaged(uint16_t csid,uint64_t chunkid) {
2533 chunk *c;
2534 slist *s;
2535 c = chunk_find(chunkid);
2536 if (c==NULL) {
2537 if (chunkid>nextchunkid+UINT64_C(1000000000)) {
2538 syslog(LOG_WARNING,"chunkserver has nonexistent chunk (%016"PRIX64"), id looks wrong - just ignore it",chunkid);
2539 return;
2540 }
2541 syslog(LOG_WARNING,"chunkserver has nonexistent chunk (%016"PRIX64"), so create it for future deletion",chunkid);
2542 if (chunkid>=nextchunkid) {
2543 nextchunkid=chunkid+1;
2544 // changelog("%"PRIu32"|NEXTCHUNKID(%"PRIu64")",main_time(),nextchunkid);
2545 }
2546 c = chunk_new(chunkid);
2547 c->version = 0;
2548 changelog("%"PRIu32"|CHUNKADD(%"PRIu64",%"PRIu32",%"PRIu32")",main_time(),c->chunkid,c->version,c->lockedto);
2549 }
2550 for (s=c->slisthead ; s ; s=s->next) {
2551 if (s->csid==csid) {
2552 if (s->valid==TDBUSY || s->valid==TDVALID) {
2553 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
2554 c->allvalidcopies--;
2555 }
2556 if (s->valid==BUSY || s->valid==VALID) {
2557 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2558 c->allvalidcopies--;
2559 c->regularvalidcopies--;
2560 }
2561 if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
2562 matocsserv_write_counters(cstab[csid].ptr,0);
2563 }
2564 s->valid = INVALID;
2565 s->version = 0;
2566 c->needverincrease = 1;
2567 chunk_priority_queue_check(c,1);
2568 chunk_mfr_state_check(c);
2569 return;
2570 }
2571 }
2572 s = slist_malloc();
2573 s->csid = csid;
2574 s->valid = INVALID;
2575 s->version = 0;
2576 s->next = c->slisthead;
2577 c->needverincrease = 1;
2578 c->slisthead = s;
2579 }
2580
chunk_lost(uint16_t csid,uint64_t chunkid)2581 void chunk_lost(uint16_t csid,uint64_t chunkid) {
2582 chunk *c;
2583 slist **sptr,*s;
2584
2585 c = chunk_find(chunkid);
2586 if (c==NULL) {
2587 return;
2588 }
2589 sptr=&(c->slisthead);
2590 while ((s=*sptr)) {
2591 if (s->csid==csid) {
2592 if (s->valid==TDBUSY || s->valid==TDVALID) {
2593 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
2594 c->allvalidcopies--;
2595 }
2596 if (s->valid==BUSY || s->valid==VALID) {
2597 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2598 c->allvalidcopies--;
2599 c->regularvalidcopies--;
2600 }
2601 if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
2602 matocsserv_write_counters(cstab[csid].ptr,0);
2603 }
2604 c->needverincrease = 1;
2605 *sptr = s->next;
2606 slist_free(s);
2607
2608 cstab[csid].lostchunkdelay = LOSTCHUNKDELAY;
2609 csreceivingchunks |= 1;
2610
2611 } else {
2612 sptr = &(s->next);
2613 }
2614 }
2615 if (c->lockedto<(uint32_t)main_time() && c->slisthead==NULL && c->fcount==0 && c->ondangerlist==0 && chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
2616 changelog("%"PRIu32"|CHUNKDEL(%"PRIu64",%"PRIu32")",main_time(),c->chunkid,c->version);
2617 chunk_delete(c);
2618 } else {
2619 chunk_priority_queue_check(c,1);
2620 chunk_mfr_state_check(c);
2621 }
2622 }
2623
chunk_get_mfrstatus(uint16_t csid)2624 uint8_t chunk_get_mfrstatus(uint16_t csid) {
2625 if (csid<MAXCSCOUNT) {
2626 switch (cstab[csid].mfr_state) {
2627 case UNKNOWN_HARD:
2628 case UNKNOWN_SOFT:
2629 return 0;
2630 case CAN_BE_REMOVED:
2631 return 2;
2632 case REPL_IN_PROGRESS:
2633 case WAS_IN_PROGRESS:
2634 return 1;
2635
2636 }
2637 }
2638 return 0;
2639 }
2640
chunk_server_remove_csid(uint16_t csid)2641 static inline void chunk_server_remove_csid(uint16_t csid) {
2642 // remove from used list
2643 if (cstab[csid].prev<MAXCSCOUNT) {
2644 cstab[cstab[csid].prev].next = cstab[csid].next;
2645 } else {
2646 csusedhead = cstab[csid].next;
2647 }
2648 if (cstab[csid].next<MAXCSCOUNT) {
2649 cstab[cstab[csid].next].prev = cstab[csid].prev;
2650 }
2651 // append to free list
2652 cstab[csid].next = MAXCSCOUNT;
2653 cstab[csid].prev = csfreetail;
2654 cstab[csfreetail].next = csid;
2655 csfreetail = csid;
2656
2657 // cstab[csid].next = csfreehead;
2658 // cstab[csid].prev = MAXCSCOUNT;
2659 // cstab[csfreehead].prev = csid;
2660 // csfreehead = csid;
2661 }
2662
chunk_server_new_csid(void)2663 static inline uint16_t chunk_server_new_csid(void) {
2664 uint16_t csid;
2665
2666 // take first element from free list
2667 csid = csfreehead;
2668 csfreehead = cstab[csid].next;
2669 cstab[csfreehead].prev = MAXCSCOUNT;
2670 // add to used list
2671 if (csusedhead<MAXCSCOUNT) {
2672 cstab[csusedhead].prev = csid;
2673 }
2674 cstab[csid].next = csusedhead;
2675 // cstab[csid].prev = MAXCSCOUNT; // not necessary - it was first element in free list
2676 csusedhead = csid;
2677 return csid;
2678 }
2679
chunk_server_check_delays(void)2680 static inline void chunk_server_check_delays(void) {
2681 uint16_t csid;
2682
2683 csreceivingchunks = 0;
2684 for (csid = csusedhead ; csid < MAXCSCOUNT ; csid = cstab[csid].next) {
2685 if (cstab[csid].newchunkdelay>0) {
2686 cstab[csid].newchunkdelay--;
2687 csreceivingchunks |= 2;
2688 }
2689 if (cstab[csid].lostchunkdelay>0) {
2690 cstab[csid].lostchunkdelay--;
2691 csreceivingchunks |= 1;
2692 }
2693 }
2694 }
2695
chunk_server_connected(void * ptr)2696 uint16_t chunk_server_connected(void *ptr) {
2697 uint16_t csid;
2698
2699 csid = chunk_server_new_csid();
2700 cstab[csid].ptr = ptr;
2701 cstab[csid].opchunks = NULL;
2702 cstab[csid].valid = 1;
2703 cstab[csid].registered = 0;
2704 cstab[csid].mfr_state = UNKNOWN_HARD;
2705 csregisterinprogress += 1;
2706 return csid;
2707 }
2708
chunk_server_register_end(uint16_t csid)2709 void chunk_server_register_end(uint16_t csid) {
2710 if (cstab[csid].registered==0 && cstab[csid].valid==1) {
2711 cstab[csid].registered = 1;
2712 csregisterinprogress -= 1;
2713 }
2714 if (csregisterinprogress==0) {
2715 matoclserv_fuse_invalidate_chunk_cache();
2716 }
2717 }
2718
chunk_server_disconnected(uint16_t csid)2719 void chunk_server_disconnected(uint16_t csid) {
2720 discserv *ds;
2721 csopchunk *csop;
2722 chunk *c;
2723
2724 ds = malloc(sizeof(discserv));
2725 ds->csid = csid;
2726 ds->next = discservers_next;
2727 discservers_next = ds;
2728 fs_cs_disconnected();
2729 cstab[csid].valid = 0;
2730 if (cstab[csid].registered==0) {
2731 csregisterinprogress -= 1;
2732 }
2733 csop = cstab[csid].opchunks;
2734 while (csop) {
2735 c = chunk_find(csop->chunkid);
2736 if (c) {
2737 chunk_remove_disconnected_chunks(c);
2738 }
2739 csop = csop->next;
2740 if (opsinprogress>0) {
2741 opsinprogress--;
2742 }
2743 }
2744 cstab[csid].opchunks = NULL;
2745
2746 for (csid = csusedhead ; csid < MAXCSCOUNT ; csid = cstab[csid].next) {
2747 cstab[csid].mfr_state = UNKNOWN_HARD;
2748 }
2749 matoclserv_fuse_invalidate_chunk_cache();
2750 }
2751
chunk_server_disconnection_loop(void)2752 void chunk_server_disconnection_loop(void) {
2753 uint32_t i;
2754 chunk *c,*cn;
2755 discserv *ds;
2756 uint64_t startutime,currutime;
2757 static uint32_t discserverspos = 0;
2758
2759 if (discservers) {
2760 startutime = monotonic_useconds();
2761 currutime = startutime;
2762 while (startutime+10000>currutime) {
2763 for (i=0 ; i<100 ; i++) {
2764 if (discserverspos<chunkrehashpos) {
2765 for (c=chunkhashtab[discserverspos>>HASHTAB_LOBITS][discserverspos&HASHTAB_MASK] ; c ; c=cn ) {
2766 cn = c->next;
2767 chunk_remove_disconnected_chunks(c);
2768 }
2769 discserverspos++;
2770 } else {
2771 while (discservers) {
2772 ds = discservers;
2773 discservers = ds->next;
2774 chunk_server_remove_csid(ds->csid);
2775 matocsserv_disconnection_finished(cstab[ds->csid].ptr);
2776 free(ds);
2777 }
2778 return;
2779 }
2780 }
2781 currutime = monotonic_useconds();
2782 }
2783 } else if (discservers_next) {
2784 discservers = discservers_next;
2785 discservers_next = NULL;
2786 discserverspos = 0;
2787 }
2788 }
2789
chunk_got_delete_status(uint16_t csid,uint64_t chunkid,uint8_t status)2790 void chunk_got_delete_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
2791 chunk *c;
2792 slist *s,**st;
2793
2794 if (status==MFS_STATUS_OK || status==MFS_ERROR_NOCHUNK) {
2795 stats_chunkops[CHUNK_OP_DELETE_OK]++;
2796 } else {
2797 stats_chunkops[CHUNK_OP_DELETE_ERR]++;
2798 }
2799
2800 c = chunk_find(chunkid);
2801 if (c==NULL) {
2802 return ;
2803 }
2804 if (status!=MFS_STATUS_OK && status!=MFS_ERROR_NOCHUNK) { // treat here MFS_ERROR_NOCHUNK as ok
2805 return ;
2806 }
2807 st = &(c->slisthead);
2808 while (*st) {
2809 s = *st;
2810 if (s->csid == csid) {
2811 if (s->valid!=DEL) {
2812 if (s->valid==TDBUSY || s->valid==TDVALID) {
2813 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
2814 c->allvalidcopies--;
2815 }
2816 if (s->valid==BUSY || s->valid==VALID) {
2817 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2818 c->allvalidcopies--;
2819 c->regularvalidcopies--;
2820 }
2821 if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
2822 matocsserv_write_counters(cstab[s->csid].ptr,0);
2823 }
2824 syslog(LOG_WARNING,"got unexpected delete status");
2825 }
2826 *st = s->next;
2827 slist_free(s);
2828 } else {
2829 st = &(s->next);
2830 }
2831 }
2832 if (c->lockedto<(uint32_t)main_time() && c->slisthead==NULL && c->fcount==0 && c->ondangerlist==0 && chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
2833 changelog("%"PRIu32"|CHUNKDEL(%"PRIu64",%"PRIu32")",main_time(),c->chunkid,c->version);
2834 chunk_delete(c);
2835 }
2836 }
2837
chunk_got_replicate_status(uint16_t csid,uint64_t chunkid,uint32_t version,uint8_t status)2838 void chunk_got_replicate_status(uint16_t csid,uint64_t chunkid,uint32_t version,uint8_t status) {
2839 chunk *c;
2840 slist *s,**st;
2841 uint8_t fix;
2842
2843 if (status==MFS_STATUS_OK) {
2844 stats_chunkops[CHUNK_OP_REPLICATE_OK]++;
2845 } else {
2846 stats_chunkops[CHUNK_OP_REPLICATE_ERR]++;
2847 }
2848
2849 c = chunk_find(chunkid);
2850 if (c==NULL) {
2851 return ;
2852 }
2853 if (c->operation==REPLICATE) { // high priority replication
2854 fix = 0;
2855 if (status!=0) { // chunk hasn't been replicated (error occurred) - simply remove it from copies
2856 st = &(c->slisthead);
2857 while (*st) {
2858 s = *st;
2859 if (s->csid==csid && s->valid==BUSY) {
2860 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2861 c->allvalidcopies--;
2862 c->regularvalidcopies--;
2863 if (c->writeinprogress) {
2864 matocsserv_write_counters(cstab[s->csid].ptr,0);
2865 }
2866 fix = 1;
2867 *st = s->next;
2868 chunk_delopchunk(s->csid,c->chunkid);
2869 slist_free(s);
2870 } else {
2871 st = &(s->next);
2872 }
2873 }
2874 }
2875 if (fix==0) {
2876 for (s=c->slisthead ; s ; s=s->next) {
2877 if (s->csid == csid) {
2878 if (s->valid!=BUSY) {
2879 syslog(LOG_WARNING,"got replication status from server not set as busy !!!");
2880 }
2881 if (status!=0 || version!=c->version) {
2882 if (s->valid==TDBUSY || s->valid==TDVALID) {
2883 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
2884 c->allvalidcopies--;
2885 }
2886 if (s->valid==BUSY || s->valid==VALID) {
2887 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2888 c->allvalidcopies--;
2889 c->regularvalidcopies--;
2890 }
2891 if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
2892 matocsserv_write_counters(cstab[s->csid].ptr,0);
2893 }
2894 s->valid = INVALID;
2895 s->version = 0; // after unfinished operation can't be sure what version chunk has
2896 } else {
2897 if (s->valid == BUSY || s->valid == VALID) {
2898 s->valid = VALID;
2899 }
2900 }
2901 chunk_delopchunk(s->csid,c->chunkid);
2902 } else if (s->valid==BUSY) {
2903 syslog(LOG_WARNING,"got replication status from one server, but another is set as busy !!!");
2904 }
2905 }
2906 }
2907 c->operation = NONE;
2908 c->lockedto = 0;
2909 matoclserv_chunk_unlocked(c->chunkid,c);
2910 } else { // low priority replication
2911 if (status!=0) {
2912 chunk_priority_queue_check(c,1);
2913 return ;
2914 }
2915 for (s=c->slisthead ; s ; s=s->next) {
2916 if (s->csid == csid) {
2917 syslog(LOG_WARNING,"got replication status from server which had had that chunk before (chunk:%016"PRIX64"_%08"PRIX32")",chunkid,version);
2918 if (s->valid==VALID && version!=c->version) {
2919 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2920 c->allvalidcopies--;
2921 c->regularvalidcopies--;
2922 s->valid = INVALID;
2923 s->version = version;
2924 if (c->writeinprogress) {
2925 matocsserv_write_counters(cstab[s->csid].ptr,0);
2926 }
2927 }
2928 chunk_priority_queue_check(c,1);
2929 return;
2930 }
2931 }
2932 s = slist_malloc();
2933 s->csid = csid;
2934 if (c->lockedto>=(uint32_t)main_time() || version!=c->version) {
2935 s->valid = INVALID;
2936 } else {
2937 chunk_write_counters(c,0);
2938 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
2939 c->allvalidcopies++;
2940 c->regularvalidcopies++;
2941 s->valid = VALID;
2942 }
2943 s->version = version;
2944 s->next = c->slisthead;
2945 c->slisthead = s;
2946 }
2947 chunk_priority_queue_check(c,1);
2948 }
2949
2950
chunk_operation_status(uint64_t chunkid,uint8_t status,uint16_t csid,uint8_t operation)2951 void chunk_operation_status(uint64_t chunkid,uint8_t status,uint16_t csid,uint8_t operation) {
2952 chunk *c;
2953 uint8_t opfinished,validcopies;
2954 uint8_t verfixed;
2955 slist *s,**st;
2956
2957 if (status==MFS_STATUS_OK) {
2958 if (operation==CREATE) {
2959 stats_chunkops[CHUNK_OP_CREATE_OK]++;
2960 } else {
2961 stats_chunkops[CHUNK_OP_CHANGE_OK]++;
2962 }
2963 } else {
2964 if (operation==CREATE) {
2965 stats_chunkops[CHUNK_OP_CREATE_ERR]++;
2966 } else {
2967 stats_chunkops[CHUNK_OP_CHANGE_ERR]++;
2968 }
2969 }
2970
2971 c = chunk_find(chunkid);
2972 if (c==NULL) {
2973 return ;
2974 }
2975
2976 if (chunk_remove_disconnected_chunks(c)) {
2977 return;
2978 }
2979 // for (s=c->slisthead ; s ; s=s->next) {
2980 // if (!cstab[s->csid].valid) {
2981 // c->interrupted = 1;
2982 // }
2983 // }
2984
2985 if (c->operation!=operation && operation!=NONE) {
2986 uint8_t eop,sop;
2987 eop = c->operation;
2988 sop = operation;
2989 if (eop>7) {
2990 eop=7;
2991 }
2992 if (sop>7) {
2993 sop=7;
2994 }
2995 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32" - got unexpected status (expected: %s ; got: %s)",chunkid,c->version,opstr[eop],opstr[sop]);
2996 }
2997
2998 validcopies = 0;
2999 opfinished = 1;
3000
3001 st = &(c->slisthead);
3002 while ((s=*st)!=NULL) {
3003 if (s->csid == csid) {
3004 if (status!=0) {
3005 c->interrupted = 1; // increase version after finish, just in case
3006 if (s->valid==TDBUSY || s->valid==TDVALID) {
3007 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3008 c->allvalidcopies--;
3009 }
3010 if (s->valid==BUSY || s->valid==VALID) {
3011 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3012 c->allvalidcopies--;
3013 c->regularvalidcopies--;
3014 }
3015 if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
3016 matocsserv_write_counters(cstab[s->csid].ptr,0);
3017 }
3018 if (status==MFS_ERROR_NOTDONE) { // special case - this operation was not even started, so we know exact result
3019 if (c->operation==SET_VERSION || c->operation==TRUNCATE) { // chunk left not changed, but now it has wrong version
3020 if (s->valid==TDBUSY || s->valid==TDVALID) {
3021 s->valid = TDWVER;
3022 } else {
3023 s->valid = WVER;
3024 }
3025 s->version--;
3026 } else if (c->operation==CREATE || c->operation==DUPLICATE || c->operation==DUPTRUNC) { // copy not created
3027 *st = s->next;
3028 slist_free(s);
3029 continue;
3030 }
3031 } else {
3032 s->valid = INVALID;
3033 s->version = 0; // after unfinished operation can't be sure what version chunk has
3034 }
3035 } else {
3036 if (s->valid==TDBUSY || s->valid==TDVALID) {
3037 s->valid=TDVALID;
3038 } else {
3039 s->valid=VALID;
3040 }
3041 }
3042 chunk_statusopchunk(s->csid,c->chunkid,status);
3043 // chunk_delopchunk(s->csid,c->chunkid);
3044 }
3045 if (s->valid==BUSY || s->valid==TDBUSY) {
3046 opfinished=0;
3047 }
3048 if (s->valid==VALID || s->valid==TDVALID) {
3049 validcopies=1;
3050 }
3051 st = &(s->next);
3052 }
3053 if (opfinished && validcopies==0 && (c->operation==SET_VERSION || c->operation==TRUNCATE)) { // we know that version increase was just not completed, so all WVER chunks with version exactly one lower than chunk version are actually VALID copies
3054 verfixed = 0;
3055 for (s=c->slisthead ; s!=NULL ; s=s->next) {
3056 if (s->version+1==c->version) {
3057 if (s->valid==TDWVER) {
3058 verfixed = 1;
3059 s->valid = TDVALID;
3060 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies);
3061 c->allvalidcopies++;
3062 } else if (s->valid==WVER) {
3063 verfixed = 1;
3064 s->valid = VALID;
3065 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
3066 c->allvalidcopies++;
3067 c->regularvalidcopies++;
3068 }
3069 }
3070 if (verfixed) {
3071 c->version--;
3072 changelog("%"PRIu32"|SETVERSION(%"PRIu64",%"PRIu32")",(uint32_t)main_time(),c->chunkid,c->version);
3073 }
3074 }
3075 // we continue because we still want to return status not done to matoclserv module
3076 }
3077 if (opfinished) {
3078 uint8_t nospace;
3079 nospace = 1;
3080 for (s=c->slisthead ; s ; s=s->next) {
3081 status = chunk_delopchunk(s->csid,chunkid);
3082 if (status!=MFS_ERROR_MISMATCH && status!=MFS_ERROR_NOSPACE) {
3083 nospace = 0;
3084 }
3085 }
3086 if (validcopies) {
3087 // syslog(LOG_NOTICE,"operation finished, chunk: %016"PRIX64" ; op: %s ; interrupted: %u",c->chunkid,opstr[c->operation],c->interrupted);
3088 if (c->interrupted) {
3089 chunk_emergency_increase_version(c);
3090 } else {
3091 c->operation = NONE;
3092 c->needverincrease = 0;
3093 matoclserv_chunk_status(c->chunkid,MFS_STATUS_OK);
3094 if (c->lockedto==0) {
3095 matoclserv_chunk_unlocked(c->chunkid,c);
3096 }
3097 }
3098 } else {
3099 if (nospace) {
3100 matoclserv_chunk_status(c->chunkid,MFS_ERROR_NOSPACE);
3101 } else {
3102 matoclserv_chunk_status(c->chunkid,MFS_ERROR_NOTDONE);
3103 }
3104 c->operation = NONE;
3105 }
3106 }
3107 }
3108
chunk_got_chunkop_status(uint16_t csid,uint64_t chunkid,uint8_t status)3109 void chunk_got_chunkop_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3110 chunk_operation_status(chunkid,status,csid,NONE);
3111 }
3112
chunk_got_create_status(uint16_t csid,uint64_t chunkid,uint8_t status)3113 void chunk_got_create_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3114 chunk_operation_status(chunkid,status,csid,CREATE);
3115 }
3116
chunk_got_duplicate_status(uint16_t csid,uint64_t chunkid,uint8_t status)3117 void chunk_got_duplicate_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3118 chunk_operation_status(chunkid,status,csid,DUPLICATE);
3119 }
3120
chunk_got_setversion_status(uint16_t csid,uint64_t chunkid,uint8_t status)3121 void chunk_got_setversion_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3122 chunk_operation_status(chunkid,status,csid,SET_VERSION);
3123 }
3124
chunk_got_truncate_status(uint16_t csid,uint64_t chunkid,uint8_t status)3125 void chunk_got_truncate_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3126 chunk_operation_status(chunkid,status,csid,TRUNCATE);
3127 }
3128
chunk_got_duptrunc_status(uint16_t csid,uint64_t chunkid,uint8_t status)3129 void chunk_got_duptrunc_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3130 chunk_operation_status(chunkid,status,csid,DUPTRUNC);
3131 }
3132
chunk_no_more_pending_jobs(void)3133 uint8_t chunk_no_more_pending_jobs(void) {
3134 return (opsinprogress==0)?1:0;
3135 }
3136
3137 /* ----------------------- */
3138 /* JOBS (DELETE/REPLICATE) */
3139 /* ----------------------- */
3140
chunk_store_info(uint8_t * buff)3141 void chunk_store_info(uint8_t *buff) {
3142 put32bit(&buff,chunksinfo_loopstart);
3143 put32bit(&buff,chunksinfo_loopend);
3144 put32bit(&buff,chunksinfo.done.del_invalid);
3145 put32bit(&buff,chunksinfo.notdone.del_invalid);
3146 put32bit(&buff,chunksinfo.done.del_unused);
3147 put32bit(&buff,chunksinfo.notdone.del_unused);
3148 put32bit(&buff,chunksinfo.done.del_diskclean);
3149 put32bit(&buff,chunksinfo.notdone.del_diskclean);
3150 put32bit(&buff,chunksinfo.done.del_overgoal);
3151 put32bit(&buff,chunksinfo.notdone.del_overgoal);
3152 put32bit(&buff,chunksinfo.done.copy_undergoal);
3153 put32bit(&buff,chunksinfo.notdone.copy_undergoal);
3154 put32bit(&buff,chunksinfo.done.copy_wronglabels);
3155 put32bit(&buff,chunksinfo.notdone.copy_wronglabels);
3156 put32bit(&buff,chunksinfo.copy_rebalance);
3157 put32bit(&buff,chunksinfo.labels_dont_match);
3158 put32bit(&buff,chunksinfo.locked_unused);
3159 put32bit(&buff,chunksinfo.locked_used);
3160 }
3161
chunk_mindist(uint16_t csid,uint32_t ip[255],uint8_t ipcnt)3162 static inline uint8_t chunk_mindist(uint16_t csid,uint32_t ip[255],uint8_t ipcnt) {
3163 uint8_t mindist,dist,k;
3164 uint32_t sip;
3165 mindist = TOPOLOGY_DIST_MAX;
3166 if (matocsserv_get_csdata(cstab[csid].ptr,&sip,NULL,NULL,NULL)==0) {
3167 for (k=0 ; k<ipcnt && mindist>0 ; k++) {
3168 dist=topology_distance(sip,ip[k]);
3169 if (dist<mindist) {
3170 mindist = dist;
3171 }
3172 }
3173 }
3174 return mindist;
3175 }
3176
3177 // first servers in the same rack (server id), then other (yes, same rack is better than same physical server, so order is 1 and then 0 or 2)
chunk_rack_sort(uint16_t servers[MAXCSCOUNT],uint16_t servcount,uint32_t ip[255],uint8_t ipcnt)3178 static inline void chunk_rack_sort(uint16_t servers[MAXCSCOUNT],uint16_t servcount,uint32_t ip[255],uint8_t ipcnt) {
3179 uint16_t i,j;
3180 uint16_t csid;
3181 uint8_t mindist;
3182
3183 if (servcount==0 || ipcnt==0) {
3184 return;
3185 }
3186 i = 0;
3187 j = servcount-1;
3188 while (i<j) {
3189 while (i<j) {
3190 mindist = chunk_mindist(servers[i],ip,ipcnt);
3191 if (mindist!=TOPOLOGY_DIST_SAME_RACKID) {
3192 break;
3193 }
3194 i++;
3195 }
3196 while (i<j) {
3197 mindist = chunk_mindist(servers[j],ip,ipcnt);
3198 if (mindist==TOPOLOGY_DIST_SAME_RACKID) {
3199 break;
3200 }
3201 j--;
3202 }
3203 if (i<j) {
3204 csid = servers[i];
3205 servers[i] = servers[j];
3206 servers[j] = csid;
3207 }
3208 }
3209 }
3210
chunk_get_undergoal_replicate_srccsid(chunk * c,uint16_t dstcsid,uint32_t now,uint32_t lclass,uint32_t rgvc,uint32_t rgtdc)3211 static inline uint16_t chunk_get_undergoal_replicate_srccsid(chunk *c,uint16_t dstcsid,uint32_t now,uint32_t lclass,uint32_t rgvc,uint32_t rgtdc) {
3212 slist *s;
3213 uint32_t r = 0;
3214 uint16_t srccsid = MAXCSCOUNT;
3215
3216 if (ReplicationsRespectTopology) {
3217 uint32_t min_dist = 0xFFFFFFFF;
3218 uint32_t dist;
3219 uint32_t ip;
3220 uint32_t cuip;
3221 uint32_t mdrgvc = 0;
3222 uint32_t mdrgtdc = 0;
3223
3224 if (matocsserv_get_csdata(cstab[dstcsid].ptr,&cuip,NULL,NULL,NULL)==0) {
3225 for (s=c->slisthead ; s ; s=s->next) {
3226 if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass] && (s->valid==VALID || s->valid==TDVALID)) {
3227 if (matocsserv_get_csdata(cstab[s->csid].ptr,&ip,NULL,NULL,NULL)==0) {
3228 dist=topology_distance(ip,cuip);
3229 if (min_dist>=dist) {
3230 if (min_dist>dist) {
3231 min_dist=dist;
3232 srccsid=s->csid;
3233 mdrgvc = 0;
3234 mdrgtdc = 0;
3235 } else if (s->valid==VALID) {
3236 srccsid=s->csid;
3237 }
3238 if (s->valid==VALID) {
3239 mdrgvc++;
3240 } else {
3241 mdrgtdc++;
3242 }
3243 }
3244 }
3245 }
3246 }
3247 if (mdrgvc > 1) {
3248 r = 1+rndu32_ranged(mdrgvc);
3249 } else if (mdrgvc == 0 && mdrgtdc > 1) { // we have to choose TDVALID chunks
3250 r = 1+rndu32_ranged(mdrgtdc);
3251 }
3252 for (s=c->slisthead ; s && r>0 ; s=s->next) {
3253 if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass] && (s->valid==VALID || s->valid==TDVALID)) {
3254 if (matocsserv_get_csdata(cstab[s->csid].ptr,&ip,NULL,NULL,NULL)==0) {
3255 dist=topology_distance(ip,cuip);
3256 if (min_dist==dist) {
3257 if (mdrgvc > 1 && s->valid==VALID) {
3258 r--;
3259 srccsid=s->csid;
3260 } else if (mdrgtdc > 1 && s->valid==TDVALID) {
3261 r--;
3262 srccsid=s->csid;
3263 }
3264 }
3265 }
3266 }
3267 }
3268 }
3269 } else {
3270 if (rgvc>0) { // if there are VALID copies then make copy of one VALID chunk
3271 r = 1+rndu32_ranged(rgvc);
3272 for (s=c->slisthead ; s && r>0 ; s=s->next) {
3273 if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass] && s->valid==VALID) {
3274 r--;
3275 srccsid = s->csid;
3276 }
3277 }
3278 } else { // if not then use TDVALID chunks.
3279 r = 1+rndu32_ranged(rgtdc);
3280 for (s=c->slisthead ; s && r>0 ; s=s->next) {
3281 if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass] && s->valid==TDVALID) {
3282 r--;
3283 srccsid = s->csid;
3284 }
3285 }
3286 }
3287 }
3288
3289 return srccsid;
3290 }
3291
chunk_undergoal_replicate(chunk * c,uint16_t dstcsid,uint32_t now,uint32_t lclass,uint16_t priority,loop_info * inforec,uint32_t rgvc,uint32_t rgtdc)3292 static inline int chunk_undergoal_replicate(chunk *c,uint16_t dstcsid,uint32_t now,uint32_t lclass,uint16_t priority,loop_info *inforec,uint32_t rgvc,uint32_t rgtdc) {
3293 slist *s;
3294 uint16_t srccsid;
3295
3296 srccsid = chunk_get_undergoal_replicate_srccsid(c,dstcsid,now,lclass,rgvc,rgtdc);
3297 if (srccsid==MAXCSCOUNT) {
3298 return -1;
3299 }
3300 stats_chunkops[CHUNK_OP_REPLICATE_TRY]++;
3301 // high priority replication
3302 if (matocsserv_send_replicatechunk(cstab[dstcsid].ptr,c->chunkid,c->version,cstab[srccsid].ptr)<0) {
3303 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": error sending replicate command",c->chunkid,c->version);
3304 return 1;
3305 }
3306 chunk_addopchunk(dstcsid,c->chunkid);
3307 c->operation = REPLICATE;
3308 c->lockedto = now+LOCKTIMEOUT;
3309 s = slist_malloc();
3310 s->csid = dstcsid;
3311 s->valid = BUSY;
3312 s->version = c->version;
3313 s->next = c->slisthead;
3314 c->slisthead = s;
3315 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
3316 c->allvalidcopies++;
3317 c->regularvalidcopies++;
3318 if (priority<5) {
3319 inforec->done.copy_undergoal++;
3320 } else {
3321 inforec->done.copy_wronglabels++;
3322 }
3323 return 0;
3324 }
3325
chunk_do_jobs(chunk * c,uint16_t scount,uint16_t fullservers,uint32_t now,uint8_t extrajob)3326 void chunk_do_jobs(chunk *c,uint16_t scount,uint16_t fullservers,uint32_t now,uint8_t extrajob) {
3327 slist *s;
3328 static uint16_t *dcsids = NULL;
3329 static uint16_t dservcount;
3330 // static uint16_t *bcsids;
3331 // static uint16_t bservcount;
3332 static uint16_t *rcsids = NULL;
3333 uint16_t servmaxpos[4];
3334 uint16_t rservcount;
3335 uint16_t srccsid,dstcsid;
3336 double repl_read_counter;
3337 uint16_t i,j,k;
3338 uint16_t prilevel;
3339 uint32_t vc,tdc,ivc,bc,tdb,dc,wvc,tdw;
3340 uint32_t goal;
3341 static loop_info inforec;
3342 static uint32_t delnotdone;
3343 static uint32_t deldone;
3344 static uint32_t prevtodeletecount;
3345 static uint32_t delloopcnt;
3346 uint32_t **labelmasks;
3347 uint32_t labelcnt;
3348 static uint16_t *servers = NULL;
3349 uint32_t servcnt,extraservcnt;
3350 int32_t *matching;
3351 uint32_t forcereplication;
3352 uint32_t vrip[255];
3353 uint8_t vripcnt;
3354 #ifdef MFSDEBUG
3355 uint8_t debug;
3356 #endif
3357
3358 if (servers==NULL) {
3359 servers = malloc(sizeof(uint16_t)*MAXCSCOUNT);
3360 passert(servers);
3361 }
3362 if (rcsids==NULL) {
3363 rcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
3364 passert(rcsids);
3365 }
3366 if (dcsids==NULL) {
3367 dcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
3368 passert(dcsids);
3369 }
3370 // if (bcsids==NULL) {
3371 // bcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
3372 // passert(bcsids);
3373 // }
3374
3375 if (c==NULL) {
3376 if (scount==JOBS_INIT) { // init tasks
3377 delnotdone = 0;
3378 deldone = 0;
3379 prevtodeletecount = 0;
3380 delloopcnt = 0;
3381 memset(&inforec,0,sizeof(loop_info));
3382 dservcount = 0;
3383 } else if (scount==JOBS_EVERYLOOP) { // every loop tasks
3384 delloopcnt++;
3385 if (delloopcnt>=16) {
3386 uint32_t todeletecount = deldone+delnotdone;
3387 delloopcnt=0;
3388 if ((delnotdone > deldone) && (todeletecount > prevtodeletecount)) {
3389 TmpMaxDelFrac *= 1.5;
3390 if (TmpMaxDelFrac>MaxDelHardLimit) {
3391 syslog(LOG_NOTICE,"DEL_LIMIT hard limit (%"PRIu32" per server) reached",MaxDelHardLimit);
3392 TmpMaxDelFrac=MaxDelHardLimit;
3393 }
3394 TmpMaxDel = TmpMaxDelFrac;
3395 syslog(LOG_NOTICE,"DEL_LIMIT temporary increased to: %"PRIu32" per server",TmpMaxDel);
3396 }
3397 if ((todeletecount < prevtodeletecount) && (TmpMaxDelFrac > MaxDelSoftLimit)) {
3398 TmpMaxDelFrac /= 1.5;
3399 if (TmpMaxDelFrac<MaxDelSoftLimit) {
3400 syslog(LOG_NOTICE,"DEL_LIMIT back to soft limit (%"PRIu32" per server)",MaxDelSoftLimit);
3401 TmpMaxDelFrac = MaxDelSoftLimit;
3402 }
3403 TmpMaxDel = TmpMaxDelFrac;
3404 syslog(LOG_NOTICE,"DEL_LIMIT decreased back to: %"PRIu32" per server",TmpMaxDel);
3405 }
3406 prevtodeletecount = todeletecount;
3407 delnotdone = 0;
3408 deldone = 0;
3409 }
3410 chunksinfo = inforec;
3411 memset(&inforec,0,sizeof(inforec));
3412 chunksinfo_loopstart = chunksinfo_loopend;
3413 chunksinfo_loopend = now;
3414 } else if (scount==JOBS_EVERYTICK) { // every second tasks
3415 dservcount = 0;
3416 // bservcount=0;
3417 } else if (scount==JOBS_TERM) {
3418 if (servers!=NULL) {
3419 free(servers);
3420 }
3421 if (rcsids!=NULL) {
3422 free(rcsids);
3423 }
3424 if (dcsids!=NULL) {
3425 free(dcsids);
3426 }
3427 }
3428 return;
3429 }
3430
3431 // step 0. remove all disconnected copies from structures
3432 if (chunk_remove_disconnected_chunks(c)) {
3433 return;
3434 }
3435
3436 if (c->lockedto < now) {
3437 chunk_write_counters(c,0);
3438 }
3439
3440 // step 1. calculate number of valid and invalid copies
3441 vc = 0;
3442 tdc = 0;
3443 ivc = 0;
3444 bc = 0;
3445 tdb = 0;
3446 dc = 0;
3447 wvc = 0;
3448 tdw = 0;
3449 for (s=c->slisthead ; s ; s=s->next) {
3450 switch (s->valid) {
3451 case INVALID:
3452 ivc++;
3453 break;
3454 case TDVALID:
3455 tdc++;
3456 break;
3457 case VALID:
3458 vc++;
3459 break;
3460 case TDBUSY:
3461 tdb++;
3462 break;
3463 case BUSY:
3464 bc++;
3465 break;
3466 case DEL:
3467 dc++;
3468 break;
3469 case WVER:
3470 wvc++;
3471 break;
3472 case TDWVER:
3473 tdw++;
3474 break;
3475 }
3476 }
3477 #ifdef MFSDEBUG
3478 debug = ((c->chunkid&0xFFF)==0)?1:0;
3479 if (debug) {
3480 mfsdebug("chunk_do_jobs ; chunkid=%016"PRIX64" ; vc=%u ; tdc=%u ; bc=%u ; tdb=%u ; wvc=%u ; tdw=%u ; ivc=%u ; dc=%u",c->chunkid,vc,tdc,bc,tdb,wvc,tdw,ivc,dc);
3481 for (s=c->slisthead ; s ; s=s->next) {
3482 mfsdebug("chunk_do_jobs ; chunkid=%016"PRIX64" ; existing copy: valid=%s ; csid=%"PRIu16" ; ip=%s",c->chunkid,validstr[s->valid],s->csid,matocsserv_getstrip(cstab[s->csid].ptr));
3483 }
3484 }
3485 #endif
3486 if (c->allvalidcopies!=vc+tdc+bc+tdb) {
3487 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": wrong all valid copies counter - (counter value: %u, should be: %u) - fixed",c->chunkid,c->version,c->allvalidcopies,vc+tdc+bc+tdb);
3488 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,vc+tdc+bc+tdb,c->regularvalidcopies,c->regularvalidcopies);
3489 c->allvalidcopies = vc+tdc+bc+tdb;
3490 }
3491 if (c->regularvalidcopies!=vc+bc) {
3492 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": wrong regular valid copies counter - (counter value: %u, should be: %u) - fixed",c->chunkid,c->version,c->regularvalidcopies,vc+bc);
3493 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,vc+bc);
3494 c->regularvalidcopies = vc+bc;
3495 }
3496 if (tdb+bc==0 && c->operation!=NONE) {
3497 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": chunk in middle of operation %s, but no chunk server is busy - finish operation",c->chunkid,c->version,opstr[c->operation]);
3498 c->operation = NONE;
3499 }
3500 goal = sclass_get_keeparch_goal(c->sclassid,c->archflag);
3501 if (vc + bc < goal && tdc + tdb > 0) {
3502 for (s=c->slisthead ; s ; s=s->next) {
3503 if (s->valid == TDVALID || s->valid == TDBUSY) {
3504 cstab[s->csid].mfr_state = REPL_IN_PROGRESS;
3505 }
3506 }
3507 }
3508 if (c->lockedto < now) {
3509 if (tdb+bc>0 && c->operation==NONE) {
3510 if (tdc+vc>0) {
3511 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": unexpected BUSY copies - fixing",c->chunkid,c->version);
3512 for (s=c->slisthead ; s ; s=s->next) {
3513 if (s->valid == BUSY) {
3514 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3515 c->allvalidcopies--;
3516 c->regularvalidcopies--;
3517 s->valid = INVALID;
3518 s->version = 0;
3519 ivc++;
3520 bc--;
3521 } else if (s->valid == TDBUSY) {
3522 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3523 c->allvalidcopies--;
3524 s->valid = INVALID;
3525 s->version = 0;
3526 ivc++;
3527 tdb--;
3528 }
3529 }
3530 } else {
3531 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": unexpected BUSY copies - can't fix",c->chunkid,c->version);
3532 }
3533 }
3534
3535 /*
3536 #warning debug
3537 {
3538 static FILE *fd = NULL;
3539 static uint32_t cnt = 0;
3540 if (fd==NULL) {
3541 fd = fopen("chunklog.txt","a");
3542 }
3543 if (fd!=NULL) {
3544 fprintf(fd,"chunk %016"PRIX64": ivc=%"PRIu32" , dc=%"PRIu32" , vc=%"PRIu32" , bc=%"PRIu32" , wvc=%"PRIu32" , tdv=%"PRIu32" , tdb=%"PRIu32" , tdw=%"PRIu32" , goal=%"PRIu8" , scount=%"PRIu16"\n",c->chunkid,ivc,dc,vc,bc,wvc,tdc,tdb,tdw,c->goal,scount);
3545 cnt++;
3546 }
3547 if (cnt>100000) {
3548 fclose(fd);
3549 fd = NULL;
3550 cnt = 0;
3551 }
3552 }
3553 */
3554 // syslog(LOG_WARNING,"chunk %016"PRIX64": ivc=%"PRIu32" , dc=%"PRIu32" , vc=%"PRIu32" , bc=%"PRIu32" , wvc=%"PRIu32" , tdv=%"PRIu32" , tdb=%"PRIu32" , tdw=%"PRIu32" , goal=%"PRIu8" , scount=%"PRIu16,c->chunkid,ivc,dc,vc,bc,wvc,tdc,tdb,tdw,c->goal,scount);
3555
3556 // step 2. check number of copies
3557 if (tdc+vc+tdb+bc==0 && wvc+tdw>0 && c->fcount>0/* c->flisthead */) {
3558 if ((tdw+wvc)>=goal) {
3559 uint32_t bestversion;
3560 bestversion = 0;
3561 for (s=c->slisthead ; s ; s=s->next) {
3562 if (s->valid==WVER || s->valid==TDWVER) {
3563 if (s->version>=bestversion) {
3564 bestversion = s->version;
3565 }
3566 }
3567 }
3568 if (bestversion>0 && ((bestversion+1)==c->version || c->version+1==bestversion)) {
3569 syslog(LOG_WARNING,"chunk %016"PRIX64" has only invalid copies (%"PRIu32") - fixing it",c->chunkid,wvc+tdw);
3570 c->version = bestversion;
3571 for (s=c->slisthead ; s ; s=s->next) {
3572 if (s->version==bestversion && cstab[s->csid].valid) {
3573 if (s->valid == WVER) {
3574 s->valid = VALID;
3575 c->allvalidcopies++;
3576 c->regularvalidcopies++;
3577 } else if (s->valid == TDWVER) {
3578 s->valid = TDVALID;
3579 c->allvalidcopies++;
3580 }
3581 }
3582 if (s->valid == WVER || s->valid == TDWVER) {
3583 syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32" - wrong versioned copy on (%s - ver:%08"PRIX32")",c->chunkid,c->version,matocsserv_getstrip(cstab[s->csid].ptr),s->version);
3584 } else {
3585 syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32" - valid copy on (%s - ver:%08"PRIX32")",c->chunkid,c->version,matocsserv_getstrip(cstab[s->csid].ptr),s->version);
3586 }
3587 }
3588 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
3589 c->needverincrease = 1;
3590 return;
3591 }
3592 }
3593 }
3594 if (tdc+vc+tdb+bc==0 && wvc+tdw+ivc>0 && c->fcount>0) {
3595 if (wvc+tdw==0) {
3596 syslog(LOG_WARNING,"chunk %016"PRIX64" has only invalid copies (%"PRIu32") - please repair it manually",c->chunkid,ivc);
3597 } else {
3598 syslog(LOG_WARNING,"chunk %016"PRIX64" has only copies with wrong versions (%"PRIu32") - please repair it manually",c->chunkid,wvc+tdw);
3599 }
3600 for (s=c->slisthead ; s ; s=s->next) {
3601 if (s->valid==INVALID) {
3602 syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32" - invalid copy on (%s - ver:%08"PRIX32")",c->chunkid,c->version,matocsserv_getstrip(cstab[s->csid].ptr),s->version);
3603 } else {
3604 syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32" - copy with wrong version on (%s - ver:%08"PRIX32")",c->chunkid,c->version,matocsserv_getstrip(cstab[s->csid].ptr),s->version);
3605 }
3606 }
3607 return;
3608 }
3609
3610 if (tdc+vc+tdb+bc+ivc+dc+wvc+tdw==0 && c->fcount>0) {
3611 syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32": there are no copies",c->chunkid,c->version);
3612 return;
3613 }
3614 }
3615
3616 // step 3.0. delete invalid copies
3617 if (extrajob==0 && (tdc+vc+tdb+bc>0 || (c->fcount==0 && c->lockedto<now))) {
3618 for (s=c->slisthead ; s ; s=s->next) {
3619 if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3620 if (s->valid==WVER || s->valid==TDWVER || s->valid==INVALID || s->valid==DEL) {
3621 if (s->valid==DEL) {
3622 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": chunk hasn't been deleted since previous loop - retry",c->chunkid,c->version);
3623 }
3624 switch (s->valid) {
3625 case WVER:
3626 wvc--;
3627 break;
3628 case TDWVER:
3629 tdw--;
3630 break;
3631 case INVALID:
3632 ivc--;
3633 break;
3634 case DEL:
3635 dc--;
3636 break;
3637 }
3638 s->valid = DEL;
3639 dc++;
3640 stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3641 matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3642 inforec.done.del_invalid++;
3643 deldone++;
3644 }
3645 } else {
3646 if (s->valid==WVER || s->valid==TDWVER || s->valid==INVALID) {
3647 inforec.notdone.del_invalid++;
3648 delnotdone++;
3649 }
3650 }
3651 }
3652 }
3653
3654 // step 3.1. check for unfinished replications
3655 if (extrajob == 0) {
3656 if (c->operation==REPLICATE && c->lockedto<now) {
3657 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": chunk hasn't been replicated since previous loop - retry",c->chunkid,c->version);
3658 for (s=c->slisthead ; s ; s=s->next) {
3659 if (s->valid==TDBUSY || s->valid==BUSY) {
3660 if (s->valid==TDBUSY) {
3661 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3662 c->allvalidcopies--;
3663 tdb--;
3664 }
3665 if (s->valid==BUSY) {
3666 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3667 c->allvalidcopies--;
3668 c->regularvalidcopies--;
3669 bc--;
3670 }
3671 s->valid = INVALID;
3672 s->version = 0; // after unfinished operation can't be sure what version chunk has
3673 chunk_delopchunk(s->csid,c->chunkid);
3674 ivc++;
3675 }
3676 }
3677 c->operation = NONE;
3678 c->lockedto = 0;
3679 matoclserv_chunk_unlocked(c->chunkid,c);
3680 }
3681 }
3682
3683 // step 4. return if chunk is during some operation
3684 if (c->operation!=NONE || (c->lockedto>=now)) {
3685 if (extrajob == 0) {
3686 if (c->fcount==0) {
3687 inforec.locked_unused++;
3688 } else {
3689 inforec.locked_used++;
3690 if (goal > vc+bc && vc+tdc+bc+tdb > 0) {
3691 if (c->operation!=NONE) {
3692 syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32": can't replicate chunk - operation %s in progress",c->chunkid,c->version,opstr[c->operation]);
3693 } else {
3694 syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32": can't replicate chunk - chunk is being modified (locked for next %"PRIu32" second%s)",c->chunkid,c->version,(uint32_t)(1+c->lockedto-now),(c->lockedto==now)?"":"s");
3695 }
3696 }
3697 }
3698 }
3699 return ;
3700 }
3701
3702 // assert(c->lockedto < now && c->writeinprogress == 0)
3703
3704 // step 5. check busy count
3705 if ((bc+tdb)>0) {
3706 syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32" has unexpected BUSY copies",c->chunkid,c->version);
3707 return ;
3708 }
3709
3710 // step 6. delete unused chunk
3711
3712 if (extrajob==0 && c->fcount==0/* c->flisthead==NULL */) {
3713 // syslog(LOG_WARNING,"unused - delete");
3714 for (s=c->slisthead ; s ; s=s->next) {
3715 if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3716 if (s->valid==VALID || s->valid==TDVALID) {
3717 if (s->valid==TDVALID) {
3718 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3719 c->allvalidcopies--;
3720 } else {
3721 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3722 c->allvalidcopies--;
3723 c->regularvalidcopies--;
3724 }
3725 c->needverincrease = 1;
3726 s->valid = DEL;
3727 stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3728 matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,c->version);
3729 inforec.done.del_unused++;
3730 deldone++;
3731 }
3732 } else {
3733 if (s->valid==VALID || s->valid==TDVALID) {
3734 inforec.notdone.del_unused++;
3735 delnotdone++;
3736 }
3737 }
3738 }
3739 return ;
3740 }
3741
3742 // step 7.0. if chunk has enough valid copies and more than one copy with wrong version then delete all copies with wrong version
3743 if (extrajob==0 && vc >= goal && wvc>0) {
3744 for (s=c->slisthead ; s ; s=s->next) {
3745 if (s->valid == WVER) {
3746 if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3747 s->valid = DEL;
3748 stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3749 matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3750 wvc--;
3751 dc++;
3752 }
3753 }
3754 }
3755 }
3756
3757 // step 7.1. if chunk has too many copies then delete some of them
3758 if (vc > goal) {
3759 uint8_t prevdone;
3760 if (dservcount==0) {
3761 // dservcount = matocsserv_getservers_ordered(dcsids,AcceptableDifference/2.0,NULL,NULL);
3762 dservcount = matocsserv_getservers_ordered(dcsids);
3763 }
3764 // syslog(LOG_WARNING,"vc (%"PRIu32") > goal (%"PRIu32") - delete",vc,sclass_getgoal(c->sclassid));
3765 inforec.notdone.del_overgoal+=(vc-goal);
3766 delnotdone+=(vc-goal);
3767 prevdone = 1;
3768
3769 if ((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag)) { // labels version
3770 servcnt = 0;
3771 for (i=0 ; i<dservcount ; i++) {
3772 for (s=c->slisthead ; s && s->csid!=dcsids[dservcount-1-i] ; s=s->next) {}
3773 if (s && s->valid==VALID) {
3774 servers[servcnt++] = s->csid;
3775 }
3776 }
3777 labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
3778 matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
3779 for (i=0 ; i<servcnt && vc>goal && prevdone ; i++) {
3780 if (matching[i+labelcnt]<0) {
3781 for (s=c->slisthead ; s && s->csid!=servers[i] ; s=s->next) {}
3782 if (s && s->valid==VALID) {
3783 if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3784 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3785 c->allvalidcopies--;
3786 c->regularvalidcopies--;
3787 c->needverincrease = 1;
3788 s->valid = DEL;
3789 stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3790 matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3791 inforec.done.del_overgoal++;
3792 inforec.notdone.del_overgoal--;
3793 deldone++;
3794 delnotdone--;
3795 vc--;
3796 dc++;
3797 } else {
3798 prevdone=0;
3799 chunk_priority_enqueue(DPRIORITY_OVERGOAL,c); // in such case only enqueue this chunk for future processing
3800 }
3801 }
3802 }
3803 }
3804 } else { // classic goal version
3805 for (i=0 ; i<dservcount && vc>goal && prevdone; i++) {
3806 for (s=c->slisthead ; s && s->csid!=dcsids[dservcount-1-i] ; s=s->next) {}
3807 if (s && s->valid==VALID) {
3808 if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3809 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3810 c->allvalidcopies--;
3811 c->regularvalidcopies--;
3812 c->needverincrease = 1;
3813 s->valid = DEL;
3814 stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3815 matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3816 inforec.done.del_overgoal++;
3817 inforec.notdone.del_overgoal--;
3818 deldone++;
3819 delnotdone--;
3820 vc--;
3821 dc++;
3822 } else {
3823 prevdone=0;
3824 chunk_priority_enqueue(DPRIORITY_OVERGOAL,c); // in such case only enqueue this chunk for future processing
3825 }
3826 }
3827 }
3828 }
3829 return;
3830 }
3831
3832 // step 7.2. if chunk has one copy on each server and some of them have status TDVALID then delete them
3833 if (extrajob==0 && vc+tdc>=scount && vc<goal && tdc>0 && vc+tdc>1 && chunks_priority_leng[DPRIORITY_ENDANGERED_HIGHGOAL]==0 && chunks_priority_leng[DPRIORITY_ENDANGERED]==0 && chunks_priority_leng[DPRIORITY_UNDERGOAL_MFR]==0) {
3834 uint8_t tdcr = 0;
3835 for (s=c->slisthead ; s ; s=s->next) {
3836 if (s->valid==TDVALID) {
3837 if (matocsserv_has_avail_space(cstab[s->csid].ptr)) {
3838 tdcr++;
3839 }
3840 }
3841 }
3842 if (vc+tdcr>=scount) {
3843 uint8_t prevdone;
3844 // syslog(LOG_WARNING,"vc+tdc (%"PRIu32") >= scount (%"PRIu32") and vc (%"PRIu32") < goal (%"PRIu32") and tdc (%"PRIu32") > 0 and vc+tdc > 1 - delete",vc+tdc,scount,vc,goal,tdc);
3845 prevdone = 0;
3846 for (s=c->slisthead ; s && prevdone==0 ; s=s->next) {
3847 if (s->valid==TDVALID) {
3848 if (matocsserv_has_avail_space(cstab[s->csid].ptr) && matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3849 chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3850 c->allvalidcopies--;
3851 c->needverincrease = 1;
3852 s->valid = DEL;
3853 stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3854 matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3855 inforec.done.del_diskclean++;
3856 tdc--;
3857 dc++;
3858 prevdone = 1;
3859 } else {
3860 inforec.notdone.del_diskclean++;
3861 }
3862 }
3863 }
3864 if (prevdone) {
3865 return;
3866 }
3867 }
3868 }
3869
3870 // step 8. check matching for labeled chunks
3871 forcereplication = 0;
3872 if ((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag)) {
3873 servcnt = 0;
3874 for (s=c->slisthead ; s ; s=s->next) {
3875 if (s->valid==VALID) {
3876 servers[servcnt++] = s->csid;
3877 }
3878 }
3879 labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
3880 matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
3881 for (i=0 ; i<labelcnt ; i++) {
3882 if (matching[i]<0) { // there are unmatched labels
3883 forcereplication++;
3884 }
3885 }
3886 }
3887
3888 // step 9. if chunk has number of copies less than goal then make another copy of this chunk
3889 if ((forcereplication || goal > vc) && vc+tdc > 0) {
3890 uint8_t canbefixed;
3891 canbefixed = 1;
3892 if (csdb_replicate_undergoals()) {
3893 // if ((csdb_getdisconnecttime()+ReplicationsDelayDisconnect)<now) {
3894 uint32_t rgvc,rgtdc;
3895 uint32_t lclass;
3896
3897 if (vc+tdc==1 && goal>2) { // highest priority - chunks with only one copy and high goal
3898 prilevel = DPRIORITY_ENDANGERED_HIGHGOAL;
3899 lclass = 0;
3900 } else if (vc+tdc==1 && goal==2) { // next priority - chunks with only one copy
3901 prilevel = DPRIORITY_ENDANGERED;
3902 lclass = 0;
3903 } else if (vc==1 && tdc>0) { // next priority - chunks on one regular disk and some "marked for removal" disks
3904 prilevel = DPRIORITY_UNDERGOAL_MFR;
3905 lclass = 1;
3906 } else if (tdc>0) { // next priority - chunks on "marked for removal" disks
3907 prilevel = DPRIORITY_MFR;
3908 lclass = 1;
3909 } else if (vc < goal) { // next priority - standard undergoal chunks
3910 prilevel = DPRIORITY_UNDERGOAL;
3911 lclass = 1;
3912 } else { // lowest priority - wrong labeled chunks
3913 prilevel = DPRIORITY_WRONGLABEL;
3914 lclass = 1;
3915 }
3916 if (extrajob==0) {
3917 for (i=0 ; i<prilevel ; i++) {
3918 if (chunks_priority_leng[i]>0) { // we have chunks with higher priority than current chunk
3919 chunk_priority_enqueue(prilevel,c); // in such case only enqueue this chunk for future processing
3920 if (prilevel<DPRIORITY_WRONGLABEL) {
3921 inforec.notdone.copy_undergoal++;
3922 } else {
3923 inforec.notdone.copy_wronglabels++;
3924 }
3925 return;
3926 }
3927 }
3928 }
3929
3930 matocsserv_get_server_groups(rcsids,MaxWriteRepl[lclass],servmaxpos);
3931 // rservcount = number of servers that are allowed to start new replication
3932 if (prilevel<DPRIORITY_UNDERGOAL_MFR) {
3933 rservcount = servmaxpos[CSSTATE_OVERLOADED];
3934 } else {
3935 rservcount = servmaxpos[CSSTATE_OK];
3936 }
3937 // rservcount = matocsserv_getservers_lessrepl(rcsids,MaxWriteRepl[lclass],(j<DPRIORITY_UNDERGOAL_MFR)?1:0,&allservflag);
3938 rgvc=0;
3939 rgtdc=0;
3940 vripcnt=0;
3941 for (s=c->slisthead ; s ; s=s->next) {
3942 if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass]) {
3943 if (s->valid==VALID) {
3944 rgvc++;
3945 if (vripcnt<255 && ReplicationsRespectTopology>1) {
3946 if (matocsserv_get_csdata(cstab[s->csid].ptr,vrip+vripcnt,NULL,NULL,NULL)==0) {
3947 vripcnt++;
3948 }
3949 }
3950 } else if (s->valid==TDVALID) {
3951 rgtdc++;
3952 if (vripcnt<255 && ReplicationsRespectTopology>1) {
3953 if (matocsserv_get_csdata(cstab[s->csid].ptr,vrip+vripcnt,NULL,NULL,NULL)==0) {
3954 vripcnt++;
3955 }
3956 }
3957 }
3958 }
3959 }
3960 if (ReplicationsRespectTopology>1) {
3961 chunk_rack_sort(rcsids,rservcount,vrip,vripcnt);
3962 }
3963 if (rgvc+rgtdc>0 && rservcount>0) { // have at least one server to read from and at least one to write to
3964 if ((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag)) { // labels version
3965 uint8_t sclass_mode;
3966 uint16_t maxlimited;
3967 uint16_t dstservcnt;
3968 // reverse order - do_perfect_match matches servers 'from right' - this is important when ReplicationsRespectTopology>1
3969 servcnt = 0;
3970 // first set all servers with limited write replication counter
3971 for (i=servmaxpos[CSSTATE_OVERLOADED] ; i<servmaxpos[CSSTATE_LIMIT_REACHED] ; i++) {
3972 for (s=c->slisthead ; s && s->csid!=rcsids[i] ; s=s->next) {}
3973 if (s==NULL) {
3974 servers[servcnt++] = rcsids[i];
3975 }
3976 }
3977 // then add overloaded servers (but only when they are not already included in rservcount)
3978 for (i=rservcount ; i<servmaxpos[CSSTATE_OVERLOADED] ; i++) {
3979 for (s=c->slisthead ; s && s->csid!=rcsids[i] ; s=s->next) {}
3980 if (s==NULL) {
3981 servers[servcnt++] = rcsids[i];
3982 }
3983 }
3984 maxlimited = servcnt;
3985 // then all valid servers in reverse order (matching from the 'right')
3986 for (i=0 ; i<rservcount ; i++) {
3987 for (s=c->slisthead ; s && s->csid!=rcsids[rservcount-1-i] ; s=s->next) {}
3988 if (s==NULL) {
3989 servers[servcnt++] = rcsids[rservcount-1-i];
3990 }
3991 }
3992 dstservcnt = servcnt;
3993 for (s=c->slisthead ; s ; s=s->next) {
3994 if (s->valid==VALID) {
3995 servers[servcnt++] = s->csid;
3996 }
3997 }
3998 labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
3999 sclass_mode = sclass_get_mode(c->sclassid);
4000 matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
4001 for (i=0 ; i<labelcnt ; i++) {
4002 int32_t servpos;
4003 if (matching[i]<(int32_t)labelcnt) {
4004 servpos=-1;
4005 } else {
4006 servpos=matching[i]-labelcnt;
4007 }
4008 // syslog(LOG_NOTICE,"labelpos: %u ; serverpos: %d ; serverip: %s ; serverlabels: %s",i,servpos,matocsserv_getstrip(cstab[servers[servpos]].ptr),matocsserv_server_get_labelstr(cstab[servers[servpos]].ptr));
4009 if (servpos<0) { // matched but only on 'no space' server (or totally unmatched)
4010 if (sclass_mode==SCLASS_MODE_STRICT) {
4011 canbefixed = 0;
4012 } else { // all other modes - labels matched to 'no space' servers only can be replicated anywhere
4013 for (j=maxlimited ; j<dstservcnt ; j++) { // check all possible destination servers
4014 if (matching[j+labelcnt]<0) { // matched to label? - if not then we can use it
4015 // syslog(LOG_NOTICE,"replicate to first available server (STD MODE and LOOSE MODE)");
4016 if (chunk_undergoal_replicate(c, servers[j], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4017 return;
4018 }
4019 }
4020 }
4021 }
4022 } else if (servpos<maxlimited) { // matched but only on 'busy' server
4023 if (sclass_mode==SCLASS_MODE_STRICT) {
4024 canbefixed = 0;
4025 } else if (sclass_mode==SCLASS_MODE_LOOSE) { // in loose mode labels matched only to overloaded servers
4026 for (j=maxlimited ; j<dstservcnt ; j++) { // check all possible destination servers
4027 if (matching[j+labelcnt]<0) { // matched to label? - if not then we can use it
4028 // syslog(LOG_NOTICE,"replicate to first available server (LOOSE MODE)");
4029 if (chunk_undergoal_replicate(c, servers[j], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4030 return;
4031 }
4032 }
4033 }
4034 } // in standard mode - we leave matching and then do not perform replication
4035 } else if (servpos<dstservcnt) { // can be replicated to correct label
4036 // syslog(LOG_NOTICE,"replicate to correct label (all modes)");
4037 if (chunk_undergoal_replicate(c, servers[servpos], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4038 return;
4039 }
4040 }
4041 }
4042 /*
4043 allowallservers = 0;
4044 if (scount<=rservcount) { // all servers can accept replication
4045 uint32_t unmatchedlabels;
4046 unmatchedlabels = 0;
4047 for (i=0 ; i<labelcnt ; i++) {
4048 if (matching[i]<0) {
4049 unmatchedlabels++;
4050 }
4051 }
4052 if (vc >= goal) { // not undergoal chunk
4053 if (unmatchedlabels >= forcereplication) { // can't fix wrong labels
4054 canbefixed = 0;
4055 }
4056 } else { // this is undergoal chunk
4057 if (goal>scount && vc==scount) {
4058 canbefixed = 0;
4059 } else if (unmatchedlabels>0) { // can't match all labels, so use all chunkservers
4060 allowallservers = 1;
4061 }
4062 }
4063 }
4064 */
4065 /*
4066 if (vc < sclass_getgoal(c->sclassid) && csdb_servers_count()<=rservcount) { // all servers can accept replications
4067 for (i=0 ; i<labelcnt ; i++) {
4068 if (matching[i]<0) {
4069 unmatchedlabels++;
4070 }
4071 }
4072 }
4073 */
4074 /* for (i=0 ; i<dstservcnt && canbefixed ; i++) {
4075 if (matching[i+labelcnt]>=0 || allowallservers) {
4076 if (chunk_undergoal_replicate(c, servers[i], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4077 return;
4078 }
4079 }
4080 }
4081 */
4082 } else { // classic goal version
4083 if (goal>servmaxpos[CSSTATE_LIMIT_REACHED] && vc==servmaxpos[CSSTATE_LIMIT_REACHED]) {
4084 canbefixed = 0;
4085 } else {
4086 for (i=0 ; i<rservcount ; i++) {
4087 for (s=c->slisthead ; s && s->csid!=rcsids[i] ; s=s->next) {}
4088 if (!s) {
4089 if (chunk_undergoal_replicate(c, rcsids[i], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4090 return;
4091 }
4092 }
4093 }
4094 }
4095 }
4096 }
4097 if (canbefixed) { // enqueue only chunks which can be fixed and only if there are servers which reached replication limits
4098 chunk_priority_enqueue(prilevel,c);
4099 }
4100 }
4101 if (vc < goal) {
4102 inforec.notdone.copy_undergoal++;
4103 } else {
4104 if (canbefixed==0) {
4105 inforec.labels_dont_match++;
4106 } else {
4107 inforec.notdone.copy_wronglabels++;
4108 }
4109 }
4110 }
4111
4112 if (extrajob) { // do not rebalane doing "extra" jobs.
4113 return;
4114 }
4115
4116 if (fullservers==0) {
4117 uint8_t queues_empty;
4118 queues_empty = 1;
4119 for (i=0 ; i<DANGER_PRIORITIES ; i++) {
4120 if (chunks_priority_leng[i]>0) {
4121 if (i<REPLICATION_DANGER_PRIORITIES) {
4122 return; // we have pending undergaal/wronglabeled chunks, so ignore chunkserver rebalance
4123 } else {
4124 queues_empty = 0;
4125 }
4126 }
4127 }
4128 if (queues_empty) {
4129 if (c->ondangerlist) {
4130 syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32": fixing 'ondangerlist' flag",c->chunkid,c->version);
4131 c->ondangerlist = 0;
4132 }
4133 }
4134 }
4135
4136 if (goal == vc && vc+tdc>0) {
4137 double maxdiff;
4138 uint8_t sclass_mode;
4139 uint8_t need_label_version;
4140
4141 // need_label_version = ((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag))?1:0;
4142 need_label_version = sclass_has_keeparch_labels(c->sclassid,c->archflag)?1:0;
4143
4144 servcnt = 0;
4145 for (s=c->slisthead ; s ; s=s->next) {
4146 if (s->valid==VALID) {
4147 servers[servcnt++] = s->csid;
4148 }
4149 }
4150 extraservcnt = servcnt;
4151 for (s=c->slisthead ; s ; s=s->next) {
4152 if (s->valid!=VALID) {
4153 servers[extraservcnt++] = s->csid;
4154 }
4155 }
4156
4157 if (need_label_version) {
4158 labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
4159 sclass_mode = sclass_get_mode(c->sclassid);
4160 matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
4161 } else {
4162 // set something to silence potential compiler warnings
4163 labelcnt = goal;
4164 sclass_mode = SCLASS_MODE_LOOSE;
4165 matching = NULL;
4166 }
4167
4168 if (dservcount==0) {
4169 dservcount = matocsserv_getservers_ordered(dcsids);
4170 }
4171
4172 srccsid = MAXCSCOUNT;
4173 dstcsid = MAXCSCOUNT;
4174 maxdiff = 0.0;
4175
4176 for (i=0 ; i<servcnt ; i++) {
4177 uint32_t *labelmask;
4178 uint8_t anyunmatched;
4179 double srcusage,dstusage;
4180 uint8_t lclass;
4181
4182 labelmask = NULL;
4183 anyunmatched = 0;
4184 if (need_label_version) {
4185 if (matching[labelcnt+i]>=0) {
4186 labelmask = labelmasks[matching[labelcnt+i]];
4187 } else if (sclass_mode!=SCLASS_MODE_LOOSE) {
4188 anyunmatched = 1;
4189 // in normal and strict mode you can use any definition that do not match any copy (it will even fix wrong labels)
4190 }
4191 }
4192 srcusage = matocsserv_get_usage(cstab[servers[i]].ptr);
4193 repl_read_counter = matocsserv_replication_read_counter(cstab[servers[i]].ptr,now);
4194 if (repl_read_counter < MaxReadRepl[2] || repl_read_counter < MaxReadRepl[3]) { // here accept any rebalance limit
4195 for (j=0 ; j<dservcount ; j++) {
4196 dstusage = matocsserv_get_usage(cstab[dcsids[j]].ptr);
4197 if (srcusage - dstusage < maxdiff) { // already found better src,dst pair
4198 break;
4199 }
4200 if (((srcusage - dstusage) <= AcceptableDifference) && last_rebalance+(0.01/(srcusage-dstusage))>=now) { // when difference is small then do not hurry
4201 break;
4202 }
4203 k = 0;
4204 while (k<extraservcnt) {
4205 if (servers[k]==dcsids[j]) {
4206 break;
4207 }
4208 if (DoNotUseSameIP) {
4209 if (matocsserv_server_get_ip(cstab[servers[k]].ptr)==matocsserv_server_get_ip(cstab[dcsids[j]].ptr)) {
4210 break;
4211 }
4212 } else if (DoNotUseSameRack) {
4213 if (topology_get_rackid(matocsserv_server_get_ip(cstab[servers[k]].ptr))==topology_get_rackid(matocsserv_server_get_ip(cstab[dcsids[j]].ptr))) {
4214 break;
4215 }
4216 } else if (LabelUniqueMask) {
4217 if ((matocsserv_server_get_labelmask(cstab[servers[k]].ptr) & LabelUniqueMask) == (matocsserv_server_get_labelmask(cstab[dcsids[j]].ptr) & LabelUniqueMask)) {
4218 break;
4219 }
4220 }
4221 k++;
4222 }
4223 if (k<extraservcnt) { // one of existing copies is on this server or this server has same IP/RACKID/UNIQLABELS
4224 continue;
4225 }
4226 if (anyunmatched) { // check if this server match any of unmatched labels
4227 for (k=0 ; k<labelcnt ; k++) {
4228 if (matching[k]<0) {
4229 if (matocsserv_server_has_labels(cstab[dcsids[j]].ptr,labelmasks[k])) {
4230 break;
4231 }
4232 }
4233 }
4234 if (k==labelcnt) { // nope
4235 break;
4236 }
4237 }
4238 if (labelmask==NULL || matocsserv_server_has_labels(cstab[dcsids[j]].ptr,labelmask)) { // if we have specific label then check if this server has them
4239 if ((srcusage - dstusage) > AcceptableDifference*1.5) { // now we know usage difference, so we can set proper limit class
4240 lclass = 3;
4241 } else {
4242 lclass = 2;
4243 }
4244 if (repl_read_counter < MaxReadRepl[lclass] && matocsserv_replication_write_counter(cstab[dcsids[j]].ptr,now)<MaxWriteRepl[lclass]) {
4245 maxdiff = srcusage - dstusage;
4246 dstcsid = dcsids[j];
4247 srccsid = servers[i];
4248 }
4249 }
4250 }
4251 }
4252 }
4253
4254 if (dstcsid!=MAXCSCOUNT && srccsid!=MAXCSCOUNT) {
4255 stats_chunkops[CHUNK_OP_REPLICATE_TRY]++;
4256 matocsserv_send_replicatechunk(cstab[dstcsid].ptr,c->chunkid,c->version,cstab[srccsid].ptr);
4257 c->needverincrease = 1;
4258 inforec.copy_rebalance++;
4259 last_rebalance = now;
4260 }
4261
4262 return;
4263 }
4264 }
4265
chunk_labelset_can_be_fulfilled(uint8_t labelcnt,uint32_t ** labelmasks)4266 uint8_t chunk_labelset_can_be_fulfilled(uint8_t labelcnt,uint32_t **labelmasks) {
4267 static uint16_t *stdcsids = NULL;
4268 static uint16_t *olcsids = NULL;
4269 static uint16_t *allcsids = NULL;
4270 static uint16_t stdcscnt;
4271 static uint16_t olcscnt;
4272 static uint16_t allcscnt;
4273 uint8_t r;
4274 uint32_t i;
4275 int32_t *matching;
4276
4277 if (stdcsids==NULL) {
4278 stdcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
4279 passert(stdcsids);
4280 }
4281 if (olcsids==NULL) {
4282 olcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
4283 passert(olcsids);
4284 }
4285 if (allcsids==NULL) {
4286 allcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
4287 passert(allcsids);
4288 }
4289
4290 if (labelcnt==0 || labelmasks==NULL) {
4291 matocsserv_getservers_test(&stdcscnt,stdcsids,&olcscnt,olcsids,&allcscnt,allcsids);
4292 }
4293
4294 matching = do_perfect_match(labelcnt,stdcscnt,labelmasks,stdcsids);
4295
4296 r = 1;
4297 for (i=0 ; i<labelcnt ; i++) {
4298 if (matching[i]<0) {
4299 r = 0;
4300 break;
4301 }
4302 }
4303 if (r==1) {
4304 return 3; // can be fulfilled
4305 }
4306
4307 if (olcsids > stdcsids) {
4308 matching = do_perfect_match(labelcnt,olcscnt,labelmasks,olcsids);
4309
4310 r = 1;
4311 for (i=0 ; i<labelcnt ; i++) {
4312 if (matching[i]<0) {
4313 r = 0;
4314 break;
4315 }
4316 }
4317 if (r==1) {
4318 return 2; // can be fulfilled using overloaded servers
4319 }
4320 }
4321
4322 if (allcsids > olcsids) {
4323 matching = do_perfect_match(labelcnt,allcscnt,labelmasks,allcsids);
4324
4325 r = 1;
4326 for (i=0 ; i<labelcnt ; i++) {
4327 if (matching[i]<0) {
4328 r = 0;
4329 break;
4330 }
4331 }
4332 if (r==1) {
4333 return 1; // can be fulfilled using servers with no space available
4334 }
4335 }
4336
4337 return 0;
4338 }
4339
chunk_clean_priority_queues(void)4340 static inline void chunk_clean_priority_queues(void) {
4341 uint32_t j,l;
4342 for (j=0 ; j<DANGER_PRIORITIES ; j++) {
4343 for (l=0 ; l<DangerMaxLeng ; l++) {
4344 if (chunks_priority_queue[j][l]!=NULL) {
4345 chunks_priority_queue[j][l]->ondangerlist = 0;
4346 }
4347 chunks_priority_queue[j][l] = NULL;
4348 }
4349 chunks_priority_head[j] = 0;
4350 chunks_priority_tail[j] = 0;
4351 chunks_priority_leng[j] = 0;
4352 }
4353 }
4354
chunk_unlock(uint64_t chunkid)4355 int chunk_unlock(uint64_t chunkid) {
4356 chunk *c;
4357
4358 c = chunk_find(chunkid);
4359 if (c==NULL) {
4360 return MFS_ERROR_NOCHUNK;
4361 }
4362 c->lockedto = 0;
4363 chunk_write_counters(c,0);
4364 chunk_priority_queue_check(c,1);
4365 if (c->ondangerlist) {
4366 chunk_do_jobs(c,matocsserv_servers_count(),matocsserv_almostfull_servers(),main_time(),1);
4367 } else {
4368 matoclserv_chunk_unlocked(c->chunkid,c);
4369 }
4370 return MFS_STATUS_OK;
4371 }
4372
chunk_mr_unlock(uint64_t chunkid)4373 int chunk_mr_unlock(uint64_t chunkid) {
4374 chunk *c;
4375 c = chunk_find(chunkid);
4376 if (c==NULL) {
4377 return MFS_ERROR_NOCHUNK;
4378 }
4379 c->lockedto = 0;
4380 chunk_write_counters(c,0);
4381 return MFS_STATUS_OK;
4382 }
4383
4384
chunk_jobs_main(void)4385 void chunk_jobs_main(void) {
4386 uint32_t i,j,l,h,t,lc,hashsteps;
4387 uint16_t scount,csid;
4388 uint16_t fullservers;
4389 chunk *c,*cn;
4390 uint32_t now;
4391 #ifdef MFSDEBUG
4392 static uint32_t lastsecond=0;
4393 #endif
4394
4395 chunk_server_disconnection_loop();
4396 chunk_server_check_delays();
4397
4398 if (chunk_counters_in_progress()) {
4399 return;
4400 }
4401 if (starttime+ReplicationsDelayInit>main_time()) {
4402 return;
4403 }
4404
4405 // full servers are not included here
4406 scount = matocsserv_servers_count();
4407
4408 if (scount==0 || chunkrehashpos==0) {
4409 return;
4410 }
4411
4412 fullservers = matocsserv_almostfull_servers();
4413
4414 now = main_time();
4415 chunk_do_jobs(NULL,JOBS_EVERYTICK,0,now,0);
4416
4417 // first serve some endangered and undergoal chunks
4418 lc = 0;
4419 for (j=0 ; j<DANGER_PRIORITIES ; j++) {
4420 if (((chunks_priority_tail[j]+chunks_priority_leng[j])%DangerMaxLeng) != chunks_priority_head[j]) {
4421 syslog(LOG_NOTICE,"danger_priority_group %"PRIu32": serious structure error, head: %"PRIu32"; tail: %"PRIu32"; leng: %"PRIu32,j,chunks_priority_head[j],chunks_priority_tail[j],+chunks_priority_leng[j]);
4422 for (l=0 ; l<DangerMaxLeng ; l++) {
4423 if (chunks_priority_queue[j][l]!=NULL) {
4424 chunks_priority_queue[j][l]->ondangerlist = 0;
4425 }
4426 chunks_priority_queue[j][l] = NULL;
4427 }
4428 chunks_priority_head[j] = 0;
4429 chunks_priority_tail[j] = 0;
4430 chunks_priority_leng[j] = 0;
4431 }
4432 #ifdef MFSDEBUG
4433 l = chunks_priority_leng[j];
4434 #endif
4435 if (lc > (1.0 - chunks_priority_mincpsperc[j])*HashCPTMax) { // prevent starvation of lowest priority chunks by highest priority chunks
4436 lc = HashCPTMax * (1.0 - chunks_priority_mincpsperc[j]);
4437 }
4438 if (chunks_priority_leng[j]>0 && lc<HashCPTMax) {
4439 h = chunks_priority_head[j];
4440 t = chunks_priority_tail[j];
4441 do {
4442 c = chunks_priority_queue[j][t];
4443 chunks_priority_queue[j][t] = NULL;
4444 t = (t+1)%DangerMaxLeng;
4445 chunks_priority_tail[j] = t;
4446 chunks_priority_leng[j]--;
4447 if (c!=NULL) {
4448 c->ondangerlist = 0;
4449 chunk_do_jobs(c,scount,fullservers,now,1);
4450 lc++;
4451 }
4452 } while (t!=h && lc<HashCPTMax);
4453 }
4454 #ifdef MFSDEBUG
4455 if (now!=lastsecond) {
4456 syslog(LOG_NOTICE,"danger_priority_group %"PRIu32": %"PRIu32"->%"PRIu32,j,l,chunks_priority_leng[j]);
4457 }
4458 #endif
4459 }
4460 #ifdef MFSDEBUG
4461 lastsecond=now;
4462 #endif
4463
4464 // then serve standard chunks
4465 lc = 0;
4466 hashsteps = 1+((chunkrehashpos)/(LoopTimeMin*TICKSPERSECOND));
4467 for (i=0 ; i<hashsteps && lc<HashCPTMax ; i++) {
4468 if (jobshcnt>=chunkrehashpos) {
4469 chunk_do_jobs(NULL,JOBS_EVERYLOOP,0,now,0); // every loop tasks
4470 jobshpos = 0;
4471 jobshcnt = 0;
4472 jobshmax = chunkrehashpos;
4473 jobshstep *= 16;
4474 if (jobshstep==0 || jobshstep>=jobshmax) {
4475 jobshstep = 1;
4476 } else {
4477 if ((jobshmax&1)==0) {
4478 jobshmax--;
4479 }
4480 }
4481 for (csid = csusedhead ; csid < MAXCSCOUNT ; csid = cstab[csid].next) {
4482 switch (cstab[csid].mfr_state) {
4483 case CAN_BE_REMOVED:
4484 case UNKNOWN_SOFT:
4485 case WAS_IN_PROGRESS:
4486 cstab[csid].mfr_state = CAN_BE_REMOVED;
4487 break;
4488 case REPL_IN_PROGRESS:
4489 cstab[csid].mfr_state = WAS_IN_PROGRESS;
4490 break;
4491 /* case UNKNOWN_HARD: */
4492 default:
4493 cstab[csid].mfr_state = UNKNOWN_SOFT;
4494 break;
4495 }
4496 }
4497 } else {
4498 c = chunkhashtab[jobshpos>>HASHTAB_LOBITS][jobshpos&HASHTAB_MASK];
4499 while (c) {
4500 cn = c->next;
4501 if (c->lockedto<(uint32_t)main_time() && c->slisthead==NULL && c->fcount==0 && c->ondangerlist==0 && chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
4502 changelog("%"PRIu32"|CHUNKDEL(%"PRIu64",%"PRIu32")",main_time(),c->chunkid,c->version);
4503 chunk_delete(c);
4504 } else {
4505 chunk_do_jobs(c,scount,fullservers,now,0);
4506 lc++;
4507 }
4508 c = cn;
4509 }
4510 jobshcnt++;
4511 if (jobshcnt<jobshmax) {
4512 jobshpos += jobshstep;
4513 jobshpos %= jobshmax;
4514 } else {
4515 jobshpos = jobshcnt;
4516 }
4517 }
4518 }
4519 }
4520
4521 /* ---- */
4522
4523 #define CHUNKFSIZE 17
4524 /*
4525 void chunk_text_dump(FILE *fd) {
4526 chunk *c;
4527 uint32_t i,lockedto,now;
4528 now = main_time();
4529
4530 for (i=0 ; i<chunkrehashpos ; i++) {
4531 for (c=chunkhashtab[i>>HASHTAB_LOBITS][i&HASHTAB_MASK] ; c ; c=c->next) {
4532 lockedto = c->lockedto;
4533 if (lockedto<now) {
4534 lockedto = 0;
4535 }
4536 fprintf(fd,"*|i:%016"PRIX64"|v:%08"PRIX32"|g:%"PRIu8"|t:%10"PRIu32"\n",c->chunkid,c->version,c->sclassid,lockedto);
4537 }
4538 }
4539 }
4540 */
chunk_load(bio * fd,uint8_t mver)4541 int chunk_load(bio *fd,uint8_t mver) {
4542 uint8_t hdr[8];
4543 uint8_t loadbuff[CHUNKFSIZE];
4544 const uint8_t *ptr;
4545 int32_t r,recsize;
4546 chunk *c;
4547 // chunkdata
4548 uint64_t chunkid;
4549 uint32_t version,lockedto;
4550 uint8_t archflag;
4551
4552 chunks=0;
4553 if (bio_read(fd,hdr,8)!=8) {
4554 syslog(LOG_WARNING,"chunks: can't read header");
4555 return -1;
4556 }
4557 ptr = hdr;
4558 nextchunkid = get64bit(&ptr);
4559 recsize = (mver==0x10)?16:CHUNKFSIZE;
4560 for (;;) {
4561 r = bio_read(fd,loadbuff,recsize);
4562 if (r!=recsize) {
4563 syslog(LOG_WARNING,"chunks: read error");
4564 return -1;
4565 }
4566 ptr = loadbuff;
4567 chunkid = get64bit(&ptr);
4568 version = get32bit(&ptr);
4569 lockedto = get32bit(&ptr);
4570 if (mver==0x10) {
4571 archflag = 0;
4572 } else {
4573 archflag = get8bit(&ptr);
4574 }
4575 if (chunkid>0) {
4576 c = chunk_new(chunkid);
4577 c->version = version;
4578 c->lockedto = lockedto;
4579 c->archflag = archflag;
4580 } else {
4581 if (version==0 && lockedto==0 && archflag==0) {
4582 return 0;
4583 } else {
4584 syslog(LOG_WARNING,"chunks: wrong ending - chunk zero with version: %"PRIu32" and locked to: %"PRIu32,version,lockedto);
4585 return -1;
4586 }
4587 }
4588 }
4589 return 0; // unreachable
4590 }
4591
chunk_store(bio * fd)4592 uint8_t chunk_store(bio *fd) {
4593 uint8_t hdr[8];
4594 uint8_t storebuff[CHUNKFSIZE];
4595 uint8_t *ptr;
4596 uint8_t archflag;
4597 uint32_t i;
4598 chunk *c;
4599 // chunkdata
4600 uint64_t chunkid;
4601 uint32_t version;
4602 uint32_t lockedto,now;
4603
4604 if (fd==NULL) {
4605 return 0x11;
4606 }
4607 now = main_time();
4608 ptr = hdr;
4609 put64bit(&ptr,nextchunkid);
4610 if (bio_write(fd,hdr,8)!=8) {
4611 return 0xFF;
4612 }
4613 for (i=0 ; i<chunkrehashpos ; i++) {
4614 for (c=chunkhashtab[i>>HASHTAB_LOBITS][i&HASHTAB_MASK] ; c ; c=c->next) {
4615 ptr = storebuff;
4616 chunkid = c->chunkid;
4617 put64bit(&ptr,chunkid);
4618 version = c->version;
4619 put32bit(&ptr,version);
4620 lockedto = c->lockedto;
4621 if (lockedto<now) {
4622 lockedto = 0;
4623 }
4624 put32bit(&ptr,lockedto);
4625 archflag = c->archflag;
4626 put8bit(&ptr,archflag);
4627 if (bio_write(fd,storebuff,CHUNKFSIZE)!=CHUNKFSIZE) {
4628 return 0xFF;
4629 }
4630 }
4631 }
4632 memset(storebuff,0,CHUNKFSIZE);
4633 if (bio_write(fd,storebuff,CHUNKFSIZE)!=CHUNKFSIZE) {
4634 return 0xFF;
4635 }
4636 return 0;
4637 }
4638
chunk_cleanup(void)4639 void chunk_cleanup(void) {
4640 uint32_t i,j;
4641 discserv *ds;
4642 // slist_bucket *sb,*sbn;
4643 // chunk_bucket *cb,*cbn;
4644
4645 chunk_clean_priority_queues();
4646 while (discservers) {
4647 ds = discservers;
4648 discservers = discservers->next;
4649 matocsserv_disconnection_finished(cstab[ds->csid].ptr);
4650 free(ds);
4651 }
4652 while (discservers_next) {
4653 ds = discservers_next;
4654 discservers_next = discservers_next->next;
4655 matocsserv_disconnection_finished(cstab[ds->csid].ptr);
4656 free(ds);
4657 }
4658 slist_free_all();
4659 // for (sb = sbhead ; sb ; sb = sbn) {
4660 // sbn = sb->next;
4661 // free(sb);
4662 // }
4663 // sbhead = NULL;
4664 // slfreehead = NULL;
4665 chunk_free_all();
4666 // for (cb = cbhead ; cb ; cb = cbn) {
4667 // cbn = cb->next;
4668 // free(cb);
4669 // }
4670 // cbhead = NULL;
4671 // chfreehead = NULL;
4672 chunk_hash_cleanup();
4673 // for (i=0 ; i<HASHSIZE ; i++) {
4674 // chunkhash[i] = NULL;
4675 // }
4676 for (i=0 ; i<MAXCSCOUNT ; i++) {
4677 cstab[i].next = i+1;
4678 cstab[i].prev = i-1;
4679 cstab[i].valid = 0;
4680 cstab[i].registered = 0;
4681 cstab[i].newchunkdelay = 0;
4682 cstab[i].lostchunkdelay = 0;
4683 }
4684 cstab[0].prev = MAXCSCOUNT;
4685 csfreehead = 0;
4686 csfreetail = MAXCSCOUNT-1;
4687 csusedhead = MAXCSCOUNT;
4688 for (i=0 ; i<MAXSCLASS*2 ; i++) {
4689 for (j=0 ; j<11 ; j++) {
4690 allchunkcounts[i][j]=0;
4691 regularchunkcounts[i][j]=0;
4692 }
4693 }
4694 }
4695
chunk_newfs(void)4696 void chunk_newfs(void) {
4697 chunks = 0;
4698 nextchunkid = 1;
4699 }
4700
chunk_parse_rep_list(char * strlist,double * replist)4701 int chunk_parse_rep_list(char *strlist,double *replist) {
4702 // N
4703 // A,B,C,D
4704 char *p;
4705 uint32_t i;
4706 double reptmp[4];
4707
4708 p = strlist;
4709 while (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') {
4710 p++;
4711 }
4712 reptmp[0] = strtod(p,&p);
4713 while (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') {
4714 p++;
4715 }
4716 if (*p==0) {
4717 for (i=0 ; i<4 ; i++) {
4718 replist[i] = reptmp[0];
4719 }
4720 return 0;
4721 }
4722 for (i=1 ; i<4 ; i++) {
4723 if (*p!=',') {
4724 return -1;
4725 }
4726 p++;
4727 while (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') {
4728 p++;
4729 }
4730 reptmp[i] = strtod(p,&p);
4731 while (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') {
4732 p++;
4733 }
4734 }
4735 if (*p==0) {
4736 for (i=0 ; i<4 ; i++) {
4737 replist[i] = reptmp[i];
4738 }
4739 return 1;
4740 }
4741 return -1;
4742 }
4743
chunk_term(void)4744 void chunk_term(void) {
4745 uint32_t i;
4746 chunk_priority_queue_check(NULL,1); // free tabs
4747 chunk_do_jobs(NULL,JOBS_TERM,0,main_time(),0); // free tabs
4748 for (i=0 ; i<MAXSCLASS*2 ; i++) {
4749 free(allchunkcounts[i]);
4750 free(regularchunkcounts[i]);
4751 }
4752 free(allchunkcounts);
4753 free(regularchunkcounts);
4754 }
4755
chunk_load_cfg_common(void)4756 void chunk_load_cfg_common(void) {
4757 uint32_t uniqmode;
4758
4759 uniqmode = cfg_getuint32("CHUNKS_UNIQUE_MODE",0);
4760
4761 DoNotUseSameIP=0;
4762 DoNotUseSameRack=0;
4763 LabelUniqueMask=0;
4764
4765 if (uniqmode==1) {
4766 DoNotUseSameIP=1;
4767 } else if (uniqmode==2) {
4768 DoNotUseSameRack=1;
4769 }
4770
4771 ReplicationsDelayInit = cfg_getuint32("REPLICATIONS_DELAY_INIT",60);
4772 ReplicationsRespectTopology = cfg_getuint8("REPLICATIONS_RESPECT_TOPOLOGY",0);
4773 CreationsRespectTopology = cfg_getuint32("CREATIONS_RESPECT_TOPOLOGY",0);
4774
4775 if (cfg_isdefined("ACCEPTABLE_PERCENTAGE_DIFFERENCE")) {
4776 AcceptableDifference = cfg_getdouble("ACCEPTABLE_PERCENTAGE_DIFFERENCE",1.0)/100.0; // 1%
4777 } else {
4778 AcceptableDifference = cfg_getdouble("ACCEPTABLE_DIFFERENCE",0.01); // 1% - deprecated option
4779 }
4780 if (AcceptableDifference<0.001) { // 0.1%
4781 AcceptableDifference = 0.001;
4782 }
4783 if (AcceptableDifference>0.1) { // 10%
4784 AcceptableDifference = 0.1;
4785 }
4786
4787 DangerMaxLeng = cfg_getuint32("PRIORITY_QUEUES_LENGTH",1000000);
4788 if (DangerMaxLeng<10000) {
4789 DangerMaxLeng = 10000;
4790 }
4791 }
4792
chunk_reload(void)4793 void chunk_reload(void) {
4794 uint32_t oldMaxDelSoftLimit,oldMaxDelHardLimit,oldDangerMaxLeng;
4795 uint32_t cps,i,j;
4796 char *repstr;
4797
4798 oldDangerMaxLeng = DangerMaxLeng;
4799
4800 oldMaxDelSoftLimit = MaxDelSoftLimit;
4801 oldMaxDelHardLimit = MaxDelHardLimit;
4802
4803 chunk_load_cfg_common();
4804
4805 MaxDelSoftLimit = cfg_getuint32("CHUNKS_SOFT_DEL_LIMIT",10);
4806 if (cfg_isdefined("CHUNKS_HARD_DEL_LIMIT")) {
4807 MaxDelHardLimit = cfg_getuint32("CHUNKS_HARD_DEL_LIMIT",25);
4808 if (MaxDelHardLimit<MaxDelSoftLimit) {
4809 MaxDelSoftLimit = MaxDelHardLimit;
4810 syslog(LOG_WARNING,"CHUNKS_SOFT_DEL_LIMIT is greater than CHUNKS_HARD_DEL_LIMIT - using CHUNKS_HARD_DEL_LIMIT for both");
4811 }
4812 } else {
4813 MaxDelHardLimit = 3 * MaxDelSoftLimit;
4814 }
4815 if (MaxDelSoftLimit==0) {
4816 MaxDelSoftLimit = oldMaxDelSoftLimit;
4817 MaxDelHardLimit = oldMaxDelHardLimit;
4818 }
4819 if (TmpMaxDelFrac<MaxDelSoftLimit) {
4820 TmpMaxDelFrac = MaxDelSoftLimit;
4821 }
4822 if (TmpMaxDelFrac>MaxDelHardLimit) {
4823 TmpMaxDelFrac = MaxDelHardLimit;
4824 }
4825 if (TmpMaxDel<MaxDelSoftLimit) {
4826 TmpMaxDel = MaxDelSoftLimit;
4827 }
4828 if (TmpMaxDel>MaxDelHardLimit) {
4829 TmpMaxDel = MaxDelHardLimit;
4830 }
4831
4832
4833 repstr = cfg_getstr("CHUNKS_WRITE_REP_LIMIT","2,1,1,4");
4834 switch (chunk_parse_rep_list(repstr,MaxWriteRepl)) {
4835 case -1:
4836 syslog(LOG_WARNING,"write replication limit parse error !!!");
4837 break;
4838 case 0:
4839 syslog(LOG_NOTICE,"write replication limit in old format - change limits to new format");
4840 }
4841 free(repstr);
4842 repstr = cfg_getstr("CHUNKS_READ_REP_LIMIT","10,5,2,5");
4843 switch (chunk_parse_rep_list(repstr,MaxReadRepl)) {
4844 case -1:
4845 syslog(LOG_WARNING,"read replication limit parse error !!!");
4846 break;
4847 case 0:
4848 syslog(LOG_NOTICE,"read replication limit in old format - change limits to new format");
4849 }
4850 free(repstr);
4851 /*
4852 repl = cfg_getuint32("CHUNKS_WRITE_REP_LIMIT",2);
4853 if (repl>0) {
4854 MaxWriteRepl = repl;
4855 }
4856
4857
4858 repl = cfg_getuint32("CHUNKS_READ_REP_LIMIT",10);
4859 if (repl>0) {
4860 MaxReadRepl = repl;
4861 }
4862 */
4863 if (cfg_isdefined("CHUNKS_LOOP_TIME")) {
4864 LoopTimeMin = cfg_getuint32("CHUNKS_LOOP_TIME",300); // deprecated option
4865 if (LoopTimeMin < MINLOOPTIME) {
4866 syslog(LOG_NOTICE,"CHUNKS_LOOP_TIME value too low (%"PRIu32") increased to %u",LoopTimeMin,MINLOOPTIME);
4867 LoopTimeMin = MINLOOPTIME;
4868 }
4869 if (LoopTimeMin > MAXLOOPTIME) {
4870 syslog(LOG_NOTICE,"CHUNKS_LOOP_TIME value too high (%"PRIu32") decreased to %u",LoopTimeMin,MAXLOOPTIME);
4871 LoopTimeMin = MAXLOOPTIME;
4872 }
4873 // HashSteps = 1+((HASHSIZE)/(LoopTimeMin*TICKSPERSECOND));
4874 HashCPTMax = 0xFFFFFFFF;
4875 } else {
4876 LoopTimeMin = cfg_getuint32("CHUNKS_LOOP_MIN_TIME",300);
4877 if (LoopTimeMin < MINLOOPTIME) {
4878 syslog(LOG_NOTICE,"CHUNKS_LOOP_MIN_TIME value too low (%"PRIu32") increased to %u",LoopTimeMin,MINLOOPTIME);
4879 LoopTimeMin = MINLOOPTIME;
4880 }
4881 if (LoopTimeMin > MAXLOOPTIME) {
4882 syslog(LOG_NOTICE,"CHUNKS_LOOP_MIN_TIME value too high (%"PRIu32") decreased to %u",LoopTimeMin,MAXLOOPTIME);
4883 LoopTimeMin = MAXLOOPTIME;
4884 }
4885 // HashSteps = 1+((HASHSIZE)/(LoopTimeMin*TICKSPERSECOND));
4886 cps = cfg_getuint32("CHUNKS_LOOP_MAX_CPS",100000);
4887 if (cps < MINCPS) {
4888 syslog(LOG_NOTICE,"CHUNKS_LOOP_MAX_CPS value too low (%"PRIu32") increased to %u",cps,MINCPS);
4889 cps = MINCPS;
4890 }
4891 if (cps > MAXCPS) {
4892 syslog(LOG_NOTICE,"CHUNKS_LOOP_MAX_CPS value too high (%"PRIu32") decreased to %u",cps,MAXCPS);
4893 cps = MAXCPS;
4894 }
4895 HashCPTMax = ((cps+(TICKSPERSECOND-1))/TICKSPERSECOND);
4896 }
4897
4898 if (DangerMaxLeng != oldDangerMaxLeng) {
4899 for (j=0 ; j<DANGER_PRIORITIES ; j++) {
4900 if (chunks_priority_leng[j]>0) {
4901 for (i=chunks_priority_tail[j] ; i!=chunks_priority_head[j] ; i = (i+1)%oldDangerMaxLeng) {
4902 if (chunks_priority_queue[j][i]!=NULL) {
4903 chunks_priority_queue[j][i]->ondangerlist=0;
4904 }
4905 }
4906 }
4907 free(chunks_priority_queue[j]);
4908 chunks_priority_queue[j] = (chunk**)malloc(sizeof(chunk*)*DangerMaxLeng);
4909 passert(chunks_priority_queue[j]);
4910 for (i=0 ; i<DangerMaxLeng ; i++) {
4911 chunks_priority_queue[j][i] = NULL;
4912 }
4913 chunks_priority_leng[j] = 0;
4914 chunks_priority_head[j] = 0;
4915 chunks_priority_tail[j] = 0;
4916 }
4917 }
4918 }
4919
chunk_strinit(void)4920 int chunk_strinit(void) {
4921 uint32_t i;
4922 uint32_t j;
4923 uint32_t cps;
4924 char *repstr;
4925
4926 starttime = main_time();
4927
4928 chunk_load_cfg_common();
4929
4930 MaxDelSoftLimit = cfg_getuint32("CHUNKS_SOFT_DEL_LIMIT",10);
4931 if (cfg_isdefined("CHUNKS_HARD_DEL_LIMIT")) {
4932 MaxDelHardLimit = cfg_getuint32("CHUNKS_HARD_DEL_LIMIT",25);
4933 if (MaxDelHardLimit<MaxDelSoftLimit) {
4934 MaxDelSoftLimit = MaxDelHardLimit;
4935 fprintf(stderr,"CHUNKS_SOFT_DEL_LIMIT is greater than CHUNKS_HARD_DEL_LIMIT - using CHUNKS_HARD_DEL_LIMIT for both\n");
4936 }
4937 } else {
4938 MaxDelHardLimit = 3 * MaxDelSoftLimit;
4939 }
4940 if (MaxDelSoftLimit==0) {
4941 fprintf(stderr,"delete limit is zero !!!\n");
4942 return -1;
4943 }
4944 TmpMaxDelFrac = MaxDelSoftLimit;
4945 TmpMaxDel = MaxDelSoftLimit;
4946
4947 repstr = cfg_getstr("CHUNKS_WRITE_REP_LIMIT","2,1,1,4");
4948 switch (chunk_parse_rep_list(repstr,MaxWriteRepl)) {
4949 case -1:
4950 fprintf(stderr,"write replication limit parse error !!!\n");
4951 return -1;
4952 case 0:
4953 fprintf(stderr,"write replication limit in old format - change limits to new format\n");
4954 }
4955 free(repstr);
4956 repstr = cfg_getstr("CHUNKS_READ_REP_LIMIT","10,5,2,5");
4957 switch (chunk_parse_rep_list(repstr,MaxReadRepl)) {
4958 case -1:
4959 fprintf(stderr,"read replication limit parse error !!!\n");
4960 return -1;
4961 case 0:
4962 fprintf(stderr,"read replication limit in old format - change limits to new format\n");
4963 }
4964 free(repstr);
4965 /*
4966 MaxWriteRepl = cfg_getuint32("CHUNKS_WRITE_REP_LIMIT",2);
4967 MaxReadRepl = cfg_getuint32("CHUNKS_READ_REP_LIMIT",10);
4968 if (MaxReadRepl==0) {
4969 fprintf(stderr,"read replication limit is zero !!!\n");
4970 return -1;
4971 }
4972 if (MaxWriteRepl==0) {
4973 fprintf(stderr,"write replication limit is zero !!!\n");
4974 return -1;
4975 }
4976 */
4977 if (cfg_isdefined("CHUNKS_LOOP_TIME")) {
4978 fprintf(stderr,"Defining loop time by CHUNKS_LOOP_TIME option is deprecated - use CHUNKS_LOOP_MAX_CPS and CHUNKS_LOOP_MIN_TIME\n");
4979 LoopTimeMin = cfg_getuint32("CHUNKS_LOOP_TIME",300); // deprecated option
4980 if (LoopTimeMin < MINLOOPTIME) {
4981 fprintf(stderr,"CHUNKS_LOOP_TIME value too low (%"PRIu32") increased to %u\n",LoopTimeMin,MINLOOPTIME);
4982 LoopTimeMin = MINLOOPTIME;
4983 }
4984 if (LoopTimeMin > MAXLOOPTIME) {
4985 fprintf(stderr,"CHUNKS_LOOP_TIME value too high (%"PRIu32") decreased to %u\n",LoopTimeMin,MAXLOOPTIME);
4986 LoopTimeMin = MAXLOOPTIME;
4987 }
4988 // HashSteps = 1+((HASHSIZE)/(LoopTimeMin*TICKSPERSECOND));
4989 HashCPTMax = 0xFFFFFFFF;
4990 } else {
4991 LoopTimeMin = cfg_getuint32("CHUNKS_LOOP_MIN_TIME",300);
4992 if (LoopTimeMin < MINLOOPTIME) {
4993 fprintf(stderr,"CHUNKS_LOOP_MIN_TIME value too low (%"PRIu32") increased to %u\n",LoopTimeMin,MINLOOPTIME);
4994 LoopTimeMin = MINLOOPTIME;
4995 }
4996 if (LoopTimeMin > MAXLOOPTIME) {
4997 fprintf(stderr,"CHUNKS_LOOP_MIN_TIME value too high (%"PRIu32") decreased to %u\n",LoopTimeMin,MAXLOOPTIME);
4998 LoopTimeMin = MAXLOOPTIME;
4999 }
5000 // HashSteps = 1+((HASHSIZE)/(LoopTimeMin*TICKSPERSECOND));
5001 cps = cfg_getuint32("CHUNKS_LOOP_MAX_CPS",100000);
5002 if (cps < MINCPS) {
5003 fprintf(stderr,"CHUNKS_LOOP_MAX_CPS value too low (%"PRIu32") increased to %u\n",cps,MINCPS);
5004 cps = MINCPS;
5005 }
5006 if (cps > MAXCPS) {
5007 fprintf(stderr,"CHUNKS_LOOP_MAX_CPS value too high (%"PRIu32") decreased to %u\n",cps,MAXCPS);
5008 cps = MAXCPS;
5009 }
5010 HashCPTMax = ((cps+(TICKSPERSECOND-1))/TICKSPERSECOND);
5011 }
5012
5013 chunk_hash_init();
5014 // for (i=0 ; i<HASHSIZE ; i++) {
5015 // chunkhash[i]=NULL;
5016 // }
5017 cstab = malloc(sizeof(csdata)*MAXCSCOUNT);
5018 passert(cstab);
5019 for (i=0 ; i<MAXCSCOUNT ; i++) {
5020 cstab[i].next = i+1;
5021 cstab[i].prev = i-1;
5022 cstab[i].opchunks = NULL;
5023 cstab[i].valid = 0;
5024 cstab[i].registered = 0;
5025 cstab[i].mfr_state = UNKNOWN_HARD;
5026 cstab[i].newchunkdelay = 0;
5027 cstab[i].lostchunkdelay = 0;
5028 }
5029 cstab[0].prev = MAXCSCOUNT;
5030 csfreehead = 0;
5031 csfreetail = MAXCSCOUNT-1;
5032 csusedhead = MAXCSCOUNT;
5033 allchunkcounts = malloc(sizeof(uint32_t*)*MAXSCLASS*2);
5034 passert(allchunkcounts);
5035 regularchunkcounts = malloc(sizeof(uint32_t*)*MAXSCLASS*2);
5036 passert(regularchunkcounts);
5037 for (i=0 ; i<MAXSCLASS*2 ; i++) {
5038 allchunkcounts[i] = malloc(sizeof(uint32_t)*11);
5039 passert(allchunkcounts[i]);
5040 regularchunkcounts[i] = malloc(sizeof(uint32_t)*11);
5041 passert(regularchunkcounts[i]);
5042 for (j=0 ; j<11 ; j++) {
5043 allchunkcounts[i][j]=0;
5044 regularchunkcounts[i][j]=0;
5045 }
5046 }
5047 jobshpos = 0;
5048 jobshstep = 1;
5049 jobshcnt = 0;
5050 jobshmax = 0;
5051 for (j=0 ; j<DANGER_PRIORITIES ; j++) {
5052 chunks_priority_queue[j] = (chunk**)malloc(sizeof(chunk*)*DangerMaxLeng);
5053 passert(chunks_priority_queue[j]);
5054 for (i=0 ; i<DangerMaxLeng ; i++) {
5055 chunks_priority_queue[j][i] = NULL;
5056 }
5057 chunks_priority_leng[j] = 0;
5058 chunks_priority_head[j] = 0;
5059 chunks_priority_tail[j] = 0;
5060 }
5061 chunk_do_jobs(NULL,JOBS_INIT,0,main_time(),0); // clear chunk loop internal data, and allocate tabs
5062 chunk_priority_queue_check(NULL,0); // allocate 'servers' tab
5063
5064 #ifdef MFSDEBUG
5065 mfsdebug(NULL);
5066 main_time_register(1,0,mfsdebug_flush);
5067 #endif
5068
5069 main_reload_register(chunk_reload);
5070 // main_time_register(1,0,chunk_jobs_main);
5071 main_msectime_register(1000/TICKSPERSECOND,0,chunk_jobs_main);
5072 main_destruct_register(chunk_term);
5073 return 1;
5074 }
5075