1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #define BUCKETS_MMAP_ALLOC 1
22 #define HASHTAB_PREALLOC 1
23 #define CHUNKHASH_MOVEFACTOR 5
24 
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28 
29 #include <inttypes.h>
30 #include <stdlib.h>
31 #include <stdarg.h>
32 #include <fcntl.h>
33 #include <unistd.h>
34 #include <syslog.h>
35 #include <stdio.h>
36 #include <string.h>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 
40 #include "MFSCommunication.h"
41 
42 #include "main.h"
43 #include "cfg.h"
44 #include "bio.h"
45 #include "metadata.h"
46 #include "matocsserv.h"
47 #include "matoclserv.h"
48 #include "changelog.h"
49 #include "csdb.h"
50 #include "random.h"
51 #include "topology.h"
52 #include "chunks.h"
53 #include "filesystem.h"
54 #include "datapack.h"
55 #include "massert.h"
56 #include "hashfn.h"
57 #include "buckets.h"
58 #include "clocks.h"
59 #include "storageclass.h"
60 
61 #define MINLOOPTIME 60
62 #define MAXLOOPTIME 7200
63 #define MAXCPS 10000000
64 #define MINCPS 10000
65 #define TICKSPERSECOND 50
66 
67 #define NEWCHUNKDELAY 150
68 #define LOSTCHUNKDELAY 50
69 
70 #define HASHTAB_LOBITS 24
71 #define HASHTAB_HISIZE (0x80000000>>(HASHTAB_LOBITS))
72 #define HASHTAB_LOSIZE (1<<HASHTAB_LOBITS)
73 #define HASHTAB_MASK (HASHTAB_LOSIZE-1)
74 #define HASHTAB_MOVEFACTOR 5
75 
76 // #define DISCLOOPRATIO 0x400
77 
78 //#define HASHSIZE 0x100000
79 //#define HASHPOS(chunkid) (((uint32_t)chunkid)&0xFFFFF)
80 
81 //#define DISCLOOPELEMENTS (HASHSIZE/0x400)
82 
83 
84 enum {JOBS_INIT,JOBS_EVERYLOOP,JOBS_EVERYTICK,JOBS_TERM};
85 
86 /* chunk.operation */
87 enum {NONE=0,CREATE,SET_VERSION,DUPLICATE,TRUNCATE,DUPTRUNC,REPLICATE};
88 
89 static const char* opstr[] = {
90 	"NONE",
91 	"CREATE",
92 	"SET_VERSION",
93 	"DUPLICATE",
94 	"TRUNCATE",
95 	"DUPLICATE+TRUNCATE",
96 	"REPLICATE",
97 	"???"
98 };
99 
100 /* slist.valid */
101 /* INVALID - got info from chunkserver (IO error etc.)  ->  to delete */
102 /* DEL - deletion in progress */
103 /* BUSY - operation in progress */
104 /* VALID - ok */
105 /* WVER - wrong version - repair or delete */
106 /* TDBUSY - to be deleted + operation in progress */
107 /* TDVALID - ok, to be deleted */
108 /* TDWVER - wrong version, to be deleted */
109 enum {INVALID,DEL,BUSY,VALID,WVER,TDBUSY,TDVALID,TDWVER};
110 
111 #ifdef MFSDEBUG
112 static const char* validstr[] = {
113 	"INVALID",
114 	"DELETE",
115 	"BUSY",
116 	"VALID",
117 	"WVER",
118 	"TDBUSY",
119 	"TDVALID",
120 	"TDWVER"
121 };
122 #endif
123 
124 typedef struct _discserv {
125 	uint16_t csid;
126 	struct _discserv *next;
127 } discserv;
128 
129 static discserv *discservers = NULL;
130 static discserv *discservers_next = NULL;
131 
132 /*
133 typedef struct _bcdata {
134 	void *ptr;
135 	uint32_t version;
136 } bcdata;
137 */
138 /*
139 typedef struct _hintlist {
140 	uint32_t ip;
141 	uint16_t port;
142 	struct _hintlist *next;
143 } hintlist;
144 */
145 typedef struct _slist {
146 	uint16_t csid;
147 	uint8_t valid;
148 	uint32_t version;
149 	struct _slist *next;
150 } slist;
151 
152 /*
153 #define SLIST_BUCKET_SIZE 5000
154 
155 typedef struct _slist_bucket {
156 	slist bucket[SLIST_BUCKET_SIZE];
157 	uint32_t firstfree;
158 	struct _slist_bucket *next;
159 } slist_bucket;
160 
161 static slist_bucket *sbhead = NULL;
162 static slist *slfreehead = NULL;
163 */
164 
165 typedef struct chunk {
166 	uint64_t chunkid;
167 	uint32_t version;
168 	uint8_t sclassid;
169 	uint8_t allvalidcopies;
170 	uint8_t regularvalidcopies;
171 	unsigned ondangerlist:1;
172 	unsigned needverincrease:1;
173 	unsigned interrupted:1;
174 	unsigned writeinprogress:1;
175 	unsigned archflag:1;
176 	unsigned operation:3;
177 	uint32_t lockedto;
178 	uint32_t fcount;
179 	slist *slisthead;
180 	uint32_t *ftab;
181 	struct chunk *next;
182 } chunk;
183 
184 /*
185 #define CHUNK_BUCKET_SIZE 20000
186 typedef struct _chunk_bucket {
187 	chunk bucket[CHUNK_BUCKET_SIZE];
188 	uint32_t firstfree;
189 	struct _chunk_bucket *next;
190 } chunk_bucket;
191 
192 static chunk_bucket *cbhead = NULL;
193 static chunk *chfreehead = NULL;
194 */
195 static chunk **chunkhashtab[HASHTAB_HISIZE];
196 static uint32_t chunkrehashpos;
197 static uint32_t chunkhashsize;
198 static uint32_t chunkhashelem;
199 
200 static uint64_t nextchunkid=1;
201 #define LOCKTIMEOUT 120
202 
203 #define UNUSED_DELETE_TIMEOUT (86400*7)
204 
205 typedef struct _csopchunk {
206 	uint64_t chunkid;
207 	uint8_t status;
208 	struct _csopchunk *next;
209 } csopchunk;
210 
211 /* csdata.mfr_state */
212 /* UNKNOWN_HARD - unknown after disconnect or creation */
213 /* UNKNOWN_SOFT - unknown, loop in progress */
214 /* CAN_BE_REMOVED - can be removed, whole loop has passed */
215 /* REPL_IN_PROGRESS - chunks still needs to be replicated, can't be removed */
216 /* WAS_IN_PROGRESS - was in REPL_IN_PROGRESS during previous loop */
217 enum {UNKNOWN_HARD,UNKNOWN_SOFT,CAN_BE_REMOVED,REPL_IN_PROGRESS,WAS_IN_PROGRESS};
218 
219 // state automaton:
220 //
221 //  1 - Chunk with number of valid copies less than goal (servers with chunks marked for removal only)
222 //  2 - Chunkserver disconnection (all servers)
223 //  3 - Loop end (all servers)
224 //
225 //                            1                 2              3
226 //  UNKNOWN_HARD     | REPL_IN_PROGRESS | UNKNOWN_HARD | UNKNOWN_SOFT
227 //  UNKNOWN_SOFT     | REPL_IN_PROGRESS | UNKNOWN_HARD | CAN_BE_REMOVED
228 //  CAN_BE_REMOVED   | REPL_IN_PROGRESS | UNKNOWN_HARD | CAN_BE_REMOVED
229 //  REPL_IN_PROGRESS | REPL_IN_PROGRESS | UNKNOWN_HARD | WAS_IN_PROGRESS
230 //  WAS_IN_PROGRESS  | REPL_IN_PROGRESS | UNKNOWN_HARD | CAN_BE_REMOVED
231 //
232 //  UNKNOWN_HARD,UNKNOWN_SOFT        - Unknown state
233 //  CAN_BE_REMOVED                   - Can be removed
234 //  REPL_IN_PROGRESS,WAS_IN_PROGRESS - In progress
235 
236 typedef struct _csdata {
237 	void *ptr;
238 	csopchunk *opchunks;
239 	uint8_t valid;
240 	unsigned registered:1;
241 	unsigned mfr_state:3;
242 	uint8_t newchunkdelay;
243 	uint8_t lostchunkdelay;
244 	uint32_t next;
245 	uint32_t prev;
246 } csdata;
247 
248 static csdata *cstab = NULL;
249 static uint32_t csfreehead = MAXCSCOUNT;
250 static uint32_t csfreetail = MAXCSCOUNT;
251 static uint32_t csusedhead = MAXCSCOUNT;
252 static uint32_t opsinprogress = 0;
253 static uint16_t csregisterinprogress = 0;
254 static uint8_t csreceivingchunks = 0;
255 
256 #define DANGER_PRIORITIES 7
257 #define REPLICATION_DANGER_PRIORITIES 6
258 
259 #define DPRIORITY_ENDANGERED_HIGHGOAL 0
260 #define DPRIORITY_ENDANGERED 1
261 #define DPRIORITY_UNDERGOAL_MFR 2
262 #define DPRIORITY_MFR 3
263 #define DPRIORITY_UNDERGOAL 4
264 #define DPRIORITY_WRONGLABEL 5
265 #define DPRIORITY_OVERGOAL 6
266 
267 static chunk** chunks_priority_queue[DANGER_PRIORITIES];
268 static uint32_t chunks_priority_leng[DANGER_PRIORITIES];
269 static uint32_t chunks_priority_head[DANGER_PRIORITIES];
270 static uint32_t chunks_priority_tail[DANGER_PRIORITIES];
271 
272 static double chunks_priority_mincpsperc[DANGER_PRIORITIES] = {1.0,0.1,0.1,0.01,0.05,0.01,0.3};
273 
274 static uint32_t ReplicationsDelayInit=60;
275 static uint32_t DangerMaxLeng=1000000;
276 
277 static double MaxWriteRepl[4];
278 static double MaxReadRepl[4];
279 static uint32_t MaxDelSoftLimit;
280 static uint32_t MaxDelHardLimit;
281 static double TmpMaxDelFrac;
282 static uint32_t TmpMaxDel;
283 static uint8_t ReplicationsRespectTopology;
284 static uint32_t CreationsRespectTopology;
285 static uint32_t LoopTimeMin;
286 //static uint32_t HashSteps;
287 static uint32_t HashCPTMax;
288 static double AcceptableDifference;
289 
290 static uint8_t DoNotUseSameIP;
291 static uint8_t DoNotUseSameRack;
292 static uint32_t LabelUniqueMask;
293 
294 static uint32_t jobshpos;
295 static uint32_t jobshstep;
296 static uint32_t jobshcnt;
297 static uint32_t jobshmax;
298 
299 static uint32_t starttime;
300 
301 typedef struct _job_info {
302 	uint32_t del_invalid;
303 	uint32_t del_unused;
304 	uint32_t del_diskclean;
305 	uint32_t del_overgoal;
306 	uint32_t copy_undergoal;
307 	uint32_t copy_wronglabels;
308 } job_info;
309 
310 typedef struct _loop_info {
311 	job_info done,notdone;
312 	uint32_t locked_unused;
313 	uint32_t locked_used;
314 	uint32_t copy_rebalance;
315 	uint32_t labels_dont_match;
316 } loop_info;
317 
318 static loop_info chunksinfo = {{0,0,0,0,0,0},{0,0,0,0,0,0},0,0,0,0};
319 static uint32_t chunksinfo_loopstart=0,chunksinfo_loopend=0;
320 
321 static uint64_t lastchunkid=0;
322 static chunk* lastchunkptr=NULL;
323 
324 static uint32_t chunks;
325 
326 static uint32_t last_rebalance=0;
327 
328 static uint32_t **allchunkcounts;
329 static uint32_t **regularchunkcounts;
330 
331 static uint32_t stats_chunkops[12];
332 
333 
334 #ifdef MFSDEBUG
mfsdebug(const char * format,...)335 static void mfsdebug(const char *format,...) {
336 	va_list ap;
337 	static FILE *fd = NULL;
338 
339 	if (format==NULL) {
340 		if (fd==NULL) {
341 			fd = fopen("mfsdebug.txt","a");
342 		} else {
343 			fflush(fd);
344 		}
345 	} else if (fd!=NULL) {
346 		fprintf(fd,"%"PRIu32" : ",(uint32_t)(main_time()));
347 		va_start(ap,format);
348 		vfprintf(fd,format,ap);
349 		va_end(ap);
350 		fputc('\n',fd);
351 	}
352 }
353 
mfsdebug_flush(void)354 void mfsdebug_flush(void) {
355 	mfsdebug(NULL);
356 }
357 #endif
358 
359 // input:
360 //  - number of desired copies -> labelcnt
361 //  - definition of desired labels -> labelmasks
362 //  - number of available servers -> servcnt
363 //  - pointers to servers -> servers
364 // output:
365 //  [0 .. labelscnt-1] -> matching server index (+labelcnt)
366 //  [labelcnt .. labelcnt+servcnt-1] -> matching label definition
367 // example:
368 //   3,6,[A,A,A],[B,A,A,C,A,A] -> [4,5,7,-1,0,1,-1,2,-1,-1]
369 
do_perfect_match(uint32_t labelcnt,uint32_t servcnt,uint32_t ** labelmasks,uint16_t * servers)370 int32_t* do_perfect_match(uint32_t labelcnt,uint32_t servcnt,uint32_t **labelmasks,uint16_t *servers) {
371 	uint32_t s,i,l,x,v,vi,sp,rsp,sid,sids,gr;
372 	int32_t t;
373 	static int32_t *imatching = NULL;
374 	static int32_t *matching = NULL;
375 	static int32_t *augment = NULL;
376 	static uint8_t *visited = NULL;
377 	static int32_t *stack = NULL;
378 	static int32_t *group = NULL;
379 	static int32_t *grnode = NULL;
380 	static uint32_t *sidval = NULL;
381 	static int32_t *sidpos = NULL;
382 	static uint32_t tablength = 0;
383 	static uint32_t stablength = 0;
384 
385 	if (labelcnt + servcnt > tablength || imatching==NULL || matching==NULL || augment==NULL || visited==NULL || stack==NULL) {
386 		tablength = 100 + 2 * (labelcnt + servcnt);
387 		if (imatching) {
388 			free(imatching);
389 		}
390 		if (matching) {
391 			free(matching);
392 		}
393 		if (augment) {
394 			free(augment);
395 		}
396 		if (visited) {
397 			free(visited);
398 		}
399 		if (stack) {
400 			free(stack);
401 		}
402 		imatching = malloc(sizeof(int32_t)*tablength);
403 		passert(imatching);
404 		matching = malloc(sizeof(int32_t)*tablength);
405 		passert(matching);
406 		augment = malloc(sizeof(int32_t)*tablength);
407 		passert(augment);
408 		visited = malloc(sizeof(uint8_t)*tablength);
409 		passert(visited);
410 		stack = malloc(sizeof(int32_t)*tablength);
411 		passert(stack);
412 	}
413 	if (servcnt > stablength || sidval==NULL || sidpos==NULL || grnode==NULL || group==NULL) {
414 		stablength = 100 + 2 * servcnt;
415 		if (sidval) {
416 			free(sidval);
417 		}
418 		if (sidpos) {
419 			free(sidpos);
420 		}
421 		if (grnode) {
422 			free(grnode);
423 		}
424 		if (group) {
425 			free(group);
426 		}
427 		sidval = malloc(sizeof(uint32_t)*stablength);
428 		passert(sidval);
429 		sidpos = malloc(sizeof(int32_t)*stablength);
430 		passert(sidpos);
431 		grnode = malloc(sizeof(int32_t)*stablength);
432 		passert(grnode);
433 		group = malloc(sizeof(int32_t)*stablength);
434 		passert(group);
435 	}
436 
437 	s = servcnt + labelcnt;
438 	if (s==0) {
439 		return matching;
440 	}
441 
442 	for (i=0 ; i<s ; i++) {
443 		imatching[i] = -1;
444 		matching[i] = -1;
445 		augment[i] = -1;
446 	}
447 
448 	if (servcnt==0 || labelcnt==0) {
449 		return matching;
450 	}
451 
452 	sp = 0;
453 
454 	// calc groups
455 	sids = 0;
456 	for (i=0 ; i<servcnt ; i++) {
457 		if (DoNotUseSameIP) {
458 			sid = matocsserv_server_get_ip(cstab[servers[i]].ptr);
459 		} else if (DoNotUseSameRack) {
460 			sid = topology_get_rackid(matocsserv_server_get_ip(cstab[servers[i]].ptr));
461 		} else {
462 			sid = matocsserv_server_get_labelmask(cstab[servers[i]].ptr) & LabelUniqueMask;
463 		}
464 		if (sid==0) {
465 			group[i] = i;
466 		} else {
467 			for (l=0 ; l<sids && sid!=sidval[l] ; l++) { }
468 			if (l==sids) {
469 				sidval[l] = sid;
470 				sidpos[l] = i;
471 				sids++;
472 			}
473 			group[i] = sidpos[l];
474 		}
475 	}
476 
477 	for (l=0 ; l<labelcnt ; l++) {
478 		if (imatching[l]==-1) {
479 			for (i=0 ; i<servcnt+labelcnt ; i++) {
480 				visited[i] = 0;
481 			}
482 			visited[l] = 1;
483 			augment[l] = -1;
484 			stack[sp++] = l; // push
485 			while (sp!=0) { // stack not empty
486 				x = stack[--sp]; // pop
487 				if (x<labelcnt) {
488 					rsp = sp;
489 					for (v=0 ; v<servcnt ; v++) {
490 #ifdef MATCH_LEFT
491 						vi = v;
492 #else
493 						vi = servcnt-1-v;
494 #endif
495 						gr = group[vi];
496 						if (visited[labelcnt+gr]==0) {
497 							if (matocsserv_server_has_labels(cstab[servers[vi]].ptr,labelmasks[x])) {
498 								visited[labelcnt+gr] = 1;
499 								augment[labelcnt+gr] = x;
500 								grnode[gr] = vi;
501 								stack[sp++] = labelcnt + gr; // push
502 							}
503 						}
504 					}
505 					// reverse stack from rsp to sp
506 					for (v=0 ; v<(sp-rsp)/2 ; v++) {
507 						// swap stack[rsp+v] , stack[sp-1-v]
508 						t = stack[sp-1-v];
509 						stack[sp-1-v] = stack[rsp+v];
510 						stack[rsp+v] = t;
511 					}
512 				} else if (imatching[x] >= 0) {
513 					augment[imatching[x]] = x;
514 					visited[imatching[x]] = 1;
515 					stack[sp++] = imatching[x];
516 				} else {
517 					while (augment[x]>=0) {
518 						if (x>=labelcnt) {
519 							imatching[x] = augment[x];
520 							imatching[augment[x]] = x;
521 							matching[augment[x]] = grnode[x-labelcnt] + labelcnt;
522 						}
523 						x = augment[x];
524 					}
525 					sp = 0; // clear stack;
526 				}
527 			}
528 		}
529 	}
530 
531 	// add reverse matching
532 	for (i=0 ; i<labelcnt ; i++) {
533 		t = matching[i];
534 		if (t>=0) {
535 			matching[t] = i;
536 		}
537 	}
538 	return matching;
539 }
540 
chunk_stats(uint32_t chunkops[12])541 void chunk_stats(uint32_t chunkops[12]) {
542 	uint32_t i;
543 	for (i=0 ; i<12 ; i++) {
544 		chunkops[i] = stats_chunkops[i];
545 		stats_chunkops[i] = 0;
546 	}
547 }
548 
549 CREATE_BUCKET_ALLOCATOR(slist,slist,10000000/sizeof(slist))
550 
551 CREATE_BUCKET_ALLOCATOR(chunk,chunk,10000000/sizeof(chunk))
552 
chunk_get_memusage(uint64_t allocated[3],uint64_t used[3])553 void chunk_get_memusage(uint64_t allocated[3],uint64_t used[3]) {
554 	allocated[0] = sizeof(chunk*)*chunkrehashpos;
555 	used[0] = sizeof(chunk*)*chunkhashelem;
556 	chunk_getusage(allocated+1,used+1);
557 	slist_getusage(allocated+2,used+2);
558 }
559 
560 /*
561 static inline uint32_t chunk_calc_hash_size(uint32_t elements) {
562 	uint32_t res=1;
563 	while (elements) {
564 		elements>>=1;
565 		res<<=1;
566 	}
567 	if (res==0) {
568 		res = UINT32_C(0x80000000);
569 	}
570 	if (res<HASHTAB_LOSIZE) {
571 		return HASHTAB_LOSIZE;
572 	}
573 	return res;
574 }
575 */
576 
chunk_hash_init(void)577 static inline void chunk_hash_init(void) {
578 	uint16_t i;
579 	chunkhashsize = 0;
580 	chunkhashelem = 0;
581 	chunkrehashpos = 0;
582 	for (i=0 ; i<HASHTAB_HISIZE ; i++) {
583 		chunkhashtab[i] = NULL;
584 	}
585 }
586 
chunk_hash_cleanup(void)587 static inline void chunk_hash_cleanup(void) {
588 	uint16_t i;
589 	chunkhashelem = 0;
590 	chunkhashsize = 0;
591 	chunkrehashpos = 0;
592 	for (i=0 ; i<HASHTAB_HISIZE ; i++) {
593 		if (chunkhashtab[i]!=NULL) {
594 #ifdef HAVE_MMAP
595 			munmap(chunkhashtab[i],sizeof(chunk*)*HASHTAB_LOSIZE);
596 #else
597 			free(chunkhashtab[i]);
598 #endif
599 		}
600 		chunkhashtab[i] = NULL;
601 	}
602 }
603 
chunk_hash_rehash(void)604 static inline void chunk_hash_rehash(void) {
605 	uint16_t i;
606 	chunkrehashpos = chunkhashsize;
607 	chunkhashsize *= 2;
608 	for (i=(chunkhashsize>>HASHTAB_LOBITS)/2 ; i<chunkhashsize>>HASHTAB_LOBITS ; i++) {
609 #ifdef HAVE_MMAP
610 		chunkhashtab[i] = mmap(NULL,sizeof(chunk*)*HASHTAB_LOSIZE,PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE,-1,0);
611 #else
612 		chunkhashtab[i] = malloc(sizeof(chunk*)*HASHTAB_LOSIZE);
613 #endif
614 		passert(chunkhashtab[i]);
615 	}
616 }
617 
chunk_hash_move(void)618 static inline void chunk_hash_move(void) {
619 	uint32_t hash;
620 	uint32_t mask;
621 	uint32_t moved=0;
622 	chunk **chptr,**chptralt,*c;
623 	mask = chunkhashsize-1;
624 	do {
625 		if (chunkrehashpos>=chunkhashsize) { // rehash complete
626 			chunkrehashpos = chunkhashsize;
627 			return;
628 		}
629 		chptr = chunkhashtab[(chunkrehashpos - (chunkhashsize/2)) >> HASHTAB_LOBITS] + (chunkrehashpos & HASHTAB_MASK);
630 		chptralt = chunkhashtab[chunkrehashpos >> HASHTAB_LOBITS] + (chunkrehashpos & HASHTAB_MASK);
631 		*chptralt = NULL;
632 		while ((c=*chptr)!=NULL) {
633 			hash = hash32(c->chunkid) & mask;
634 			if (hash==chunkrehashpos) {
635 				*chptralt = c;
636 				*chptr = c->next;
637 				chptralt = &(c->next);
638 				c->next = NULL;
639 			} else {
640 				chptr = &(c->next);
641 			}
642 			moved++;
643 		}
644 		chunkrehashpos++;
645 	} while (moved<CHUNKHASH_MOVEFACTOR);
646 }
647 
chunk_hash_find(uint64_t chunkid)648 static inline chunk* chunk_hash_find(uint64_t chunkid) {
649 	chunk *c;
650 	uint32_t hash;
651 
652 	if (chunkhashsize==0) {
653 		return NULL;
654 	}
655 	hash = hash32(chunkid) & (chunkhashsize-1);
656 	if (chunkrehashpos<chunkhashsize) {
657 		chunk_hash_move();
658 		if (hash >= chunkrehashpos) {
659 			hash -= chunkhashsize/2;
660 		}
661 	}
662 	for (c=chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK] ; c ; c=c->next) {
663 		if (c->chunkid==chunkid) {
664 			return c;
665 		}
666 	}
667 	return NULL;
668 }
669 
chunk_hash_delete(chunk * c)670 static inline void chunk_hash_delete(chunk *c) {
671 	chunk **chptr,*cit;
672 	uint32_t hash;
673 
674 	if (chunkhashsize==0) {
675 		return;
676 	}
677 	hash = hash32(c->chunkid) & (chunkhashsize-1);
678 	if (chunkrehashpos<chunkhashsize) {
679 		chunk_hash_move();
680 		if (hash >= chunkrehashpos) {
681 			hash -= chunkhashsize/2;
682 		}
683 	}
684 	chptr = chunkhashtab[hash>>HASHTAB_LOBITS] + (hash&HASHTAB_MASK);
685 	while ((cit=*chptr)!=NULL) {
686 		if (cit==c) {
687 			*chptr = c->next;
688 			chunkhashelem--;
689 			return;
690 		}
691 		chptr = &(cit->next);
692 	}
693 }
694 
chunk_hash_add(chunk * c)695 static inline void chunk_hash_add(chunk *c) {
696 	uint16_t i;
697 	uint32_t hash;
698 
699 	if (chunkhashsize==0) {
700 		chunkhashsize = HASHTAB_LOSIZE; //chunk_calc_hash_size(maxnodeid);
701 		chunkrehashpos = chunkhashsize;
702 		chunkhashelem = 0;
703 		for (i=0 ; i<chunkhashsize>>HASHTAB_LOBITS ; i++) {
704 #ifdef HAVE_MMAP
705 			chunkhashtab[i] = mmap(NULL,sizeof(chunk*)*HASHTAB_LOSIZE,PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE,-1,0);
706 #else
707 			chunkhashtab[i] = malloc(sizeof(chunk*)*HASHTAB_LOSIZE);
708 #endif
709 			passert(chunkhashtab[i]);
710 			memset(chunkhashtab[i],0,sizeof(chunk*));
711 			if (chunkhashtab[i][0]==NULL) {
712 				memset(chunkhashtab[i],0,sizeof(chunk*)*HASHTAB_LOSIZE);
713 			} else {
714 				for (hash=0 ; hash<HASHTAB_LOSIZE ; hash++) {
715 					chunkhashtab[i][hash] = NULL;
716 				}
717 			}
718 		}
719 	}
720 	hash = hash32(c->chunkid) & (chunkhashsize-1);
721 	if (chunkrehashpos<chunkhashsize) {
722 		chunk_hash_move();
723 		if (hash >= chunkrehashpos) {
724 			hash -= chunkhashsize/2;
725 		}
726 		c->next = chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK];
727 		chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK] = c;
728 		chunkhashelem++;
729 	} else {
730 		c->next = chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK];
731 		chunkhashtab[hash>>HASHTAB_LOBITS][hash&HASHTAB_MASK] = c;
732 		chunkhashelem++;
733 		if (chunkhashelem>chunkhashsize && (chunkhashsize>>HASHTAB_LOBITS)<HASHTAB_HISIZE) {
734 			chunk_hash_rehash();
735 		}
736 	}
737 }
738 
chunk_new(uint64_t chunkid)739 chunk* chunk_new(uint64_t chunkid) {
740 	chunk *newchunk;
741 	newchunk = chunk_malloc();
742 //#ifdef METARESTORE
743 //	printf("N%"PRIu64"\n",chunkid);
744 //#endif
745 	chunks++;
746 	allchunkcounts[0][0]++;
747 	regularchunkcounts[0][0]++;
748 	newchunk->chunkid = chunkid;
749 	newchunk->version = 0;
750 	newchunk->sclassid = 0;
751 	newchunk->lockedto = 0;
752 	newchunk->allvalidcopies = 0;
753 	newchunk->regularvalidcopies = 0;
754 	newchunk->needverincrease = 1;
755 	newchunk->ondangerlist = 0;
756 	newchunk->interrupted = 0;
757 	newchunk->writeinprogress = 0;
758 	newchunk->archflag = 0;
759 	newchunk->operation = NONE;
760 	newchunk->slisthead = NULL;
761 	newchunk->fcount = 0;
762 //	newchunk->flisthead = NULL;
763 	newchunk->ftab = NULL;
764 	lastchunkid = chunkid;
765 	lastchunkptr = newchunk;
766 	chunk_hash_add(newchunk);
767 	return newchunk;
768 }
769 
chunk_find(uint64_t chunkid)770 chunk* chunk_find(uint64_t chunkid) {
771 	chunk *c;
772 //#ifdef METARESTORE
773 //	printf("F%"PRIu64"\n",chunkid);
774 //#endif
775 	if (lastchunkid==chunkid) {
776 		return lastchunkptr;
777 	}
778 	c = chunk_hash_find(chunkid);
779 	if (c) {
780 		lastchunkid = chunkid;
781 		lastchunkptr = c;
782 	}
783 	return c;
784 }
785 
chunk_delete(chunk * c)786 void chunk_delete(chunk* c) {
787 	uint32_t indx;
788 	if (lastchunkptr==c) {
789 		lastchunkid=0;
790 		lastchunkptr=NULL;
791 	}
792 	chunks--;
793 	indx = c->sclassid + (c->archflag?MAXSCLASS:0);
794 	allchunkcounts[indx][0]--;
795 	regularchunkcounts[indx][0]--;
796 	chunk_hash_delete(c);
797 	chunk_free(c);
798 }
799 
chunk_state_change(uint8_t oldsclassid,uint8_t newsclassid,uint8_t oldarchflag,uint8_t newarchflag,uint8_t oldavc,uint8_t newavc,uint8_t oldrvc,uint8_t newrvc)800 static inline void chunk_state_change(uint8_t oldsclassid,uint8_t newsclassid,uint8_t oldarchflag,uint8_t newarchflag,uint8_t oldavc,uint8_t newavc,uint8_t oldrvc,uint8_t newrvc) {
801 	uint32_t oldindx = oldsclassid + (oldarchflag?MAXSCLASS:0);
802 	uint32_t newindx = newsclassid + (newarchflag?MAXSCLASS:0);
803 	if (oldavc>9) {
804 		oldavc=10;
805 	}
806 	if (newavc>9) {
807 		newavc=10;
808 	}
809 	if (oldrvc>9) {
810 		oldrvc=10;
811 	}
812 	if (newrvc>9) {
813 		newrvc=10;
814 	}
815 	allchunkcounts[oldindx][oldavc]--;
816 	allchunkcounts[newindx][newavc]++;
817 	regularchunkcounts[oldindx][oldrvc]--;
818 	regularchunkcounts[newindx][newrvc]++;
819 }
820 
chunk_count(void)821 uint32_t chunk_count(void) {
822 	return chunks;
823 }
824 
chunk_sclass_counters(uint8_t sclassid,uint8_t archflag,uint8_t goal,uint64_t * undergoal,uint64_t * exactgoal,uint64_t * overgoal)825 void chunk_sclass_counters(uint8_t sclassid,uint8_t archflag,uint8_t goal,uint64_t *undergoal,uint64_t *exactgoal,uint64_t *overgoal) {
826 	uint32_t indx = sclassid + (archflag?MAXSCLASS:0);
827 	uint32_t i;
828 
829 	*undergoal = 0;
830 	*exactgoal = 0;
831 	*overgoal = 0;
832 	for (i=0 ; i<11 ; i++) {
833 		if (i<goal) {
834 			*undergoal += regularchunkcounts[indx][i];
835 		} else if (i>goal) {
836 			*overgoal += regularchunkcounts[indx][i];
837 		} else {
838 			*exactgoal = regularchunkcounts[indx][i];
839 		}
840 	}
841 }
842 
chunk_info(uint32_t * allchunks,uint32_t * allcopies,uint32_t * regularvalidcopies)843 void chunk_info(uint32_t *allchunks,uint32_t *allcopies,uint32_t *regularvalidcopies) {
844 	uint32_t i,j,ag,rg;
845 	*allchunks = chunks;
846 	*allcopies = 0;
847 	*regularvalidcopies = 0;
848 	for (i=1 ; i<=10 ; i++) {
849 		ag=0;
850 		rg=0;
851 		for (j=0 ; j<MAXSCLASS*2 ; j++) {
852 			ag += allchunkcounts[j][i];
853 			rg += regularchunkcounts[j][i];
854 		}
855 		*allcopies += ag*i;
856 		*regularvalidcopies += rg*i;
857 	}
858 }
859 
chunk_get_missing_count(void)860 uint32_t chunk_get_missing_count(void) {
861 	uint32_t res=0;
862 	uint32_t i;
863 
864 	for (i=1 ; i<MAXSCLASS*2 ; i++) {
865 		res+=allchunkcounts[i][0];
866 	}
867 	return res;
868 }
869 
chunk_counters_in_progress(void)870 uint8_t chunk_counters_in_progress(void) {
871 //	syslog(LOG_NOTICE,"discservers: %p , discservers_next: %p , csregisterinprogress: %"PRIu16,discservers,discservers_next,csregisterinprogress);
872 	return ((discservers!=NULL || discservers_next!=NULL)?1:0)|((csregisterinprogress>0)?2:0)|csreceivingchunks;
873 }
874 
chunk_store_chunkcounters(uint8_t * buff,uint8_t matrixid)875 void chunk_store_chunkcounters(uint8_t *buff,uint8_t matrixid) {
876 	uint32_t i,j,indx;
877 	uint32_t counts[11][11];
878 
879 	for (i=0 ; i<=10 ; i++) {
880 		for (j=0 ; j<=10 ; j++) {
881 			counts[i][j]=0;
882 		}
883 	}
884 
885 	if (matrixid==0) {
886 		for (i=0 ; i<MAXSCLASS*2 ; i++) {
887 			indx = sclass_get_keeparch_goal(i%MAXSCLASS,i/MAXSCLASS);
888 			if (indx>10) {
889 				indx=10;
890 			}
891 			for (j=0 ; j<=10 ; j++) {
892 				counts[indx][j] += allchunkcounts[i][j];
893 			}
894 		}
895 	} else if (matrixid==1) {
896 		for (i=0 ; i<MAXSCLASS*2 ; i++) {
897 			indx = sclass_get_keeparch_goal(i%MAXSCLASS,i/MAXSCLASS);
898 			if (indx>10) {
899 				indx=10;
900 			}
901 			for (j=0 ; j<=10 ; j++) {
902 				counts[indx][j] += regularchunkcounts[i][j];
903 			}
904 		}
905 	}
906 
907 	for (i=0 ; i<=10 ; i++) {
908 		for (j=0 ; j<=10 ; j++) {
909 			put32bit(&buff,counts[i][j]);
910 		}
911 	}
912 }
913 
914 /* --- */
915 
chunk_priority_enqueue(uint8_t j,chunk * c)916 static inline void chunk_priority_enqueue(uint8_t j,chunk *c) {
917 	uint32_t h,l;
918 	if (c->ondangerlist) {
919 		return;
920 	}
921 	l = chunks_priority_leng[j];
922 	h = chunks_priority_head[j];
923 	if (l>=DangerMaxLeng) {
924 		if (chunks_priority_queue[j][h]!=NULL) {
925 			chunks_priority_queue[j][h]->ondangerlist=0;
926 		}
927 	}
928 	chunks_priority_queue[j][h] = c;
929 	c->ondangerlist = 1;
930 	h = (h+1)%DangerMaxLeng;
931 	chunks_priority_head[j] = h;
932 	if (l<DangerMaxLeng) {
933 		chunks_priority_leng[j] = l+1;
934 	} else {
935 		chunks_priority_tail[j] = h;
936 	}
937 }
938 
chunk_priority_queue_check(chunk * c,uint8_t checklabels)939 static inline void chunk_priority_queue_check(chunk *c,uint8_t checklabels) {
940 	slist *s;
941 	uint32_t vc,tdc;
942 	uint32_t goal;
943 	uint8_t j;
944 	static uint16_t *servers = NULL;
945 	uint32_t servcnt;
946 	uint32_t **labelmasks;
947 	uint32_t labelcnt;
948 	int32_t *matching;
949 	uint8_t wronglabels;
950 #ifdef MFSDEBUG
951 	uint8_t debug;
952 #endif
953 
954 	if (c==NULL) {
955 		if (servers==NULL && checklabels==0) {
956 			servers = malloc(sizeof(uint16_t)*MAXCSCOUNT);
957 			passert(servers);
958 		}
959 		if (servers!=NULL && checklabels==1) {
960 			free(servers);
961 		}
962 		return;
963 	}
964 
965 #ifdef MFSDEBUG
966 	debug = ((c->chunkid&0xFFF)==0)?1:0;
967 #endif
968 	if (c->ondangerlist || servers==NULL || c->sclassid==0 || c->fcount==0 || c->lockedto+3600>=(uint32_t)main_time()) {
969 #ifdef MFSDEBUG
970 		if (debug) {
971 			mfsdebug("chunk_priority_queue_check ; chunkid=%016"PRIX64" ; ignore: ondangerlist=%u ; servers%c=NULL ; sclassid=%u ; fcount=%u ; lockedto=%"PRIu32,c->chunkid,c->ondangerlist,(servers==NULL)?'=':'!',c->sclassid,c->fcount,c->lockedto);
972 		}
973 #endif
974 		return;
975 	}
976 	vc = 0;
977 	tdc = 0;
978 	for (s=c->slisthead ; s ; s=s->next) {
979 		switch (s->valid) {
980 		case TDVALID:
981 			tdc++;
982 			break;
983 		case VALID:
984 			vc++;
985 			break;
986 		}
987 	}
988 	wronglabels = 0;
989 	goal = sclass_get_keeparch_goal(c->sclassid,c->archflag);
990 	if (((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag)) && vc >= goal && checklabels) {
991 		servcnt = 0;
992 		for (s=c->slisthead ; s ; s=s->next) {
993 			if (s->valid==VALID) {
994 				servers[servcnt++] = s->csid;
995 			}
996 		}
997 		labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
998 		matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
999 		for (j=0 ; j<labelcnt ; j++) {
1000 			if (matching[j]<0) { // there are unmatched labels
1001 				wronglabels = 1;
1002 				break;
1003 			}
1004 		}
1005 	}
1006 	if (vc+tdc > 0 && (vc != goal || wronglabels)) { // wrong-goal chunk
1007 		if (vc+tdc==1 && goal>2) { // highest priority - chunks with one copy and high goal
1008 			j = DPRIORITY_ENDANGERED_HIGHGOAL;
1009 		} else if (vc+tdc==1 && goal==2) { // next priority - chunks with one copy
1010 			j = DPRIORITY_ENDANGERED;
1011 		} else if (vc==1 && tdc>0) { // next priority - chunks on one regular disk and some "marked for removal" disks
1012 			j = DPRIORITY_UNDERGOAL_MFR;
1013 		} else if (tdc>0) { // next priority - chunks on "marked for removal" disks
1014 			j = DPRIORITY_MFR;
1015 		} else if (vc < goal) { // next priority - standard undergoal chunks
1016 			j = DPRIORITY_UNDERGOAL;
1017 		} else if (wronglabels) { // next priority - changed labels
1018 			j = DPRIORITY_WRONGLABEL;
1019 		} else { // lowest priority - overgoal
1020 			j = DPRIORITY_OVERGOAL;
1021 		}
1022 #ifdef MFSDEBUG
1023 		if (debug) {
1024 			mfsdebug("chunk_priority_queue_check ; chunkid=%016"PRIX64" ; enqueue: priority=%u ; vc=%u ; tdc=%u ; goal=%u ; wronglabels=%u",c->chunkid,j,vc,tdc,goal,wronglabels);
1025 		}
1026 #endif
1027 		chunk_priority_enqueue(j,c);
1028 #ifdef MFSDEBUG
1029 	} else if (debug) {
1030 		mfsdebug("chunk_priority_queue_check ; chunkid=%016"PRIX64" ; exit: vc=%u ; tdc=%u ; goal=%u ; wronglabels=%u",c->chunkid,vc,tdc,goal,wronglabels);
1031 #endif
1032 	}
1033 }
1034 
1035 /* --- */
1036 
chunk_addopchunk(uint16_t csid,uint64_t chunkid)1037 void chunk_addopchunk(uint16_t csid,uint64_t chunkid) {
1038 	csopchunk *csop;
1039 	csop = malloc(sizeof(csopchunk));
1040 	csop->chunkid = chunkid;
1041 	csop->status = MFS_ERROR_MISMATCH;
1042 	csop->next = cstab[csid].opchunks;
1043 	cstab[csid].opchunks = csop;
1044 	opsinprogress++;
1045 	return;
1046 }
1047 
chunk_statusopchunk(uint16_t csid,uint64_t chunkid,uint8_t status)1048 void chunk_statusopchunk(uint16_t csid,uint64_t chunkid,uint8_t status) {
1049 	csopchunk *csop;
1050 	for (csop=cstab[csid].opchunks ; csop!=NULL; csop=csop->next) {
1051 		if (csop->chunkid==chunkid) {
1052 			csop->status = status;
1053 			return;
1054 		}
1055 	}
1056 }
1057 
chunk_delopchunk(uint16_t csid,uint64_t chunkid)1058 uint8_t chunk_delopchunk(uint16_t csid,uint64_t chunkid) {
1059 	csopchunk **csopp,*csop;
1060 	uint8_t status;
1061 
1062 	status = MFS_ERROR_MISMATCH;
1063 	csopp = &(cstab[csid].opchunks);
1064 	while ((csop = (*csopp))) {
1065 		if (csop->chunkid == chunkid) {
1066 			status = csop->status;
1067 			*csopp = csop->next;
1068 			free(csop);
1069 			if (opsinprogress>0) {
1070 				opsinprogress--;
1071 			}
1072 		} else {
1073 			csopp = &(csop->next);
1074 		}
1075 	}
1076 	return status;
1077 }
1078 
chunk_creation_servers(uint16_t csids[MAXCSCOUNT],uint8_t sclassid,uint8_t * olflag,uint32_t clientip)1079 static inline uint16_t chunk_creation_servers(uint16_t csids[MAXCSCOUNT],uint8_t sclassid,uint8_t *olflag,uint32_t clientip) {
1080 	uint16_t tmpcsids[MAXCSCOUNT];
1081 	int32_t *matching;
1082 	uint32_t **labelmasks;
1083 	uint8_t labelcnt;
1084 	uint8_t sclass_mode;
1085 	uint16_t servcount;
1086 	uint16_t goodlabelscount;
1087 	uint16_t overloaded;
1088 	uint32_t i,j;
1089 	uint32_t dist;
1090 	uint16_t cpos,fpos;
1091 	int32_t x;
1092 
1093 	servcount = matocsserv_getservers_wrandom(csids,&overloaded);
1094 	if (servcount==0) {
1095 		*olflag = (overloaded>0)?1:0;
1096 		return 0;
1097 	}
1098 	sclass_mode = sclass_get_mode(sclassid);
1099 	labelcnt = sclass_get_create_goal(sclassid);
1100 	if (servcount < labelcnt && servcount + overloaded >= labelcnt) {
1101 		*olflag = 1;
1102 		return 0;
1103 	} else {
1104 		*olflag = 0;
1105 	}
1106 
1107 	if (CreationsRespectTopology>0) {
1108 		cpos = 0;
1109 		fpos = MAXCSCOUNT;
1110 		for (i=0 ; i<servcount ; i++) {
1111 			dist = topology_distance(matocsserv_server_get_ip(cstab[csids[i]].ptr),clientip);
1112 			if (dist < CreationsRespectTopology) { // close
1113 				tmpcsids[cpos++] = csids[i];
1114 			} else { // far
1115 				tmpcsids[--fpos] = csids[i];
1116 			}
1117 		}
1118 		if (cpos!=0 && fpos!=MAXCSCOUNT) {
1119 			for (i=0 ; i<cpos ; i++) {
1120 				csids[i] = tmpcsids[i];
1121 			}
1122 			for (i=fpos,j=servcount-1 ; i<MAXCSCOUNT ; i++,j--) {
1123 				csids[j] = tmpcsids[i];
1124 			}
1125 		}
1126 	}
1127 
1128 	if (sclass_has_create_labels(sclassid)) {
1129 		labelcnt = sclass_get_create_labelmasks(sclassid,&labelmasks);
1130 
1131 		// reverse server list
1132 		for (i=0 ; i<servcount/2 ; i++) {
1133 			x = csids[i];
1134 			csids[i] = csids[servcount-1-i];
1135 			csids[servcount-1-i] = x;
1136 		}
1137 
1138 		// match servers to labels
1139 		matching = do_perfect_match(labelcnt,servcount,labelmasks,csids);
1140 
1141 		if (sclass_mode != SCLASS_MODE_STRICT) {
1142 			goodlabelscount = 0;
1143 			// extend matching to fulfill goal
1144 			for (i=0 ; i<labelcnt ; i++) {
1145 				if (matching[i]<0) {
1146 					for (j=0 ; j<servcount ; j++) {
1147 						if (matching[labelcnt+servcount-j-1]<0) {
1148 							matching[i] = labelcnt+servcount-j-1;
1149 							matching[labelcnt+servcount-j-1] = i;
1150 							break;
1151 						}
1152 					}
1153 				} else {
1154 					goodlabelscount++;
1155 				}
1156 				if (matching[i]<0) { // no more servers
1157 					break;
1158 				}
1159 			}
1160 
1161 			if (sclass_mode == SCLASS_MODE_STD) {
1162 				if (goodlabelscount < labelcnt && goodlabelscount + overloaded >= labelcnt) {
1163 					*olflag = 1;
1164 					return 0;
1165 				}
1166 			}
1167 
1168 		}
1169 
1170 		// setting servers in proper order
1171 		i = 0;
1172 		j = servcount-1;
1173 		while (i<j) {
1174 			while (i<j && matching[labelcnt+i]>=0) {
1175 				i++;
1176 			}
1177 			while (i<j && matching[labelcnt+j]<0) {
1178 				j--;
1179 			}
1180 			if (i<j) {
1181 				x = matching[labelcnt+i];
1182 				matching[labelcnt+i] = matching[labelcnt+j];
1183 				matching[labelcnt+j] = x;
1184 				x = csids[i];
1185 				csids[i] = csids[j];
1186 				csids[j] = x;
1187 			}
1188 		}
1189 		if (sclass_mode == SCLASS_MODE_STRICT) {
1190 			if (i < labelcnt && i + overloaded >= labelcnt) {
1191 				*olflag = 1;
1192 				return 0;
1193 			}
1194 			return i;
1195 		}
1196 	}
1197 	return servcount;
1198 }
1199 
chunk_emergency_increase_version(chunk * c)1200 void chunk_emergency_increase_version(chunk *c) {
1201 	slist *s;
1202 	uint32_t i;
1203 	i=0;
1204 //	chunk_remove_disconnected_chunks(c);
1205 	for (s=c->slisthead ;s ; s=s->next) {
1206 		if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
1207 			if (s->valid==TDVALID || s->valid==TDBUSY) {
1208 				s->valid = TDBUSY;
1209 			} else {
1210 				s->valid = BUSY;
1211 			}
1212 			s->version = c->version+1;
1213 			stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
1214 			matocsserv_send_setchunkversion(cstab[s->csid].ptr,c->chunkid,c->version+1,c->version);
1215 			chunk_addopchunk(s->csid,c->chunkid);
1216 			i++;
1217 		}
1218 	}
1219 	if (i>0) {	// should always be true !!!
1220 		c->interrupted = 0;
1221 		c->operation = SET_VERSION;
1222 		c->version++;
1223 		changelog("%"PRIu32"|SETVERSION(%"PRIu64",%"PRIu32")",(uint32_t)main_time(),c->chunkid,c->version);
1224 	} else {
1225 		matoclserv_chunk_status(c->chunkid,MFS_ERROR_CHUNKLOST);
1226 	}
1227 }
1228 
chunk_remove_disconnected_chunks(chunk * c)1229 static inline int chunk_remove_disconnected_chunks(chunk *c) {
1230 	uint8_t opfinished,validcopies,disc;
1231 	uint8_t verfixed;
1232 	slist *s,**st;
1233 
1234 	if (discservers==NULL && discservers_next==NULL) {
1235 		return 0;
1236 	}
1237 	disc = 0;
1238 	st = &(c->slisthead);
1239 	while (*st) {
1240 		s = *st;
1241 		if (!cstab[s->csid].valid) {
1242 			if (s->valid==TDBUSY || s->valid==TDVALID) {
1243 				chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
1244 				c->allvalidcopies--;
1245 			}
1246 			if (s->valid==BUSY || s->valid==VALID) {
1247 				chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
1248 				c->allvalidcopies--;
1249 				c->regularvalidcopies--;
1250 			}
1251 			if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) { // pro forma
1252 				matocsserv_write_counters(cstab[s->csid].ptr,0);
1253 			}
1254 			c->needverincrease = 1;
1255 			*st = s->next;
1256 			slist_free(s);
1257 			disc = 1;
1258 		} else {
1259 			st = &(s->next);
1260 		}
1261 	}
1262 	if (disc==0) {
1263 		return 0;
1264 	}
1265 	if (c->lockedto<(uint32_t)main_time() && c->slisthead==NULL && c->fcount==0 && c->ondangerlist==0 && chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
1266 		changelog("%"PRIu32"|CHUNKDEL(%"PRIu64",%"PRIu32")",main_time(),c->chunkid,c->version);
1267 		chunk_delete(c);
1268 		return 1;
1269 	}
1270 	if (c->operation!=NONE) {
1271 		validcopies=0;
1272 		opfinished=1;
1273 		for (s=c->slisthead ; s ; s=s->next) {
1274 			if (s->valid==BUSY || s->valid==TDBUSY) {
1275 				opfinished=0;
1276 			}
1277 			if (s->valid==VALID || s->valid==TDVALID) {
1278 				validcopies=1;
1279 			}
1280 		}
1281 
1282 		if (opfinished && validcopies==0 && (c->operation==SET_VERSION || c->operation==TRUNCATE)) { // we know that version increase was just not completed, so all WVER chunks with version exactly one lower than chunk version are actually VALID copies
1283 			verfixed = 0;
1284 			for (s=c->slisthead ; s!=NULL ; s=s->next) {
1285 				if (s->version+1==c->version) {
1286 					if (s->valid==TDWVER) {
1287 						verfixed = 1;
1288 						s->valid = TDVALID;
1289 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies);
1290 						c->allvalidcopies++;
1291 					} else if (s->valid==WVER) {
1292 						verfixed = 1;
1293 						s->valid = VALID;
1294 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
1295 						c->allvalidcopies++;
1296 						c->regularvalidcopies++;
1297 					}
1298 				}
1299 				if (verfixed) {
1300 					c->version--;
1301 					changelog("%"PRIu32"|SETVERSION(%"PRIu64",%"PRIu32")",(uint32_t)main_time(),c->chunkid,c->version);
1302 				}
1303 			}
1304 			// we continue because we still want to return status not done to matoclserv module
1305 		}
1306 
1307 		if (opfinished) {
1308 			uint8_t nospace,status;
1309 			nospace = 1;
1310 			for (s=c->slisthead ; s ; s=s->next) {
1311 				status = chunk_delopchunk(s->csid,c->chunkid);
1312 				if (status!=MFS_ERROR_MISMATCH && status!=MFS_ERROR_NOSPACE) {
1313 					nospace = 0;
1314 				}
1315 			}
1316 			if (c->operation==REPLICATE) {
1317 				c->operation = NONE;
1318 				c->lockedto = 0;
1319 				matoclserv_chunk_unlocked(c->chunkid,c);
1320 			} else {
1321 				if (validcopies) {
1322 					chunk_emergency_increase_version(c);
1323 				} else {
1324 					if (nospace) {
1325 						matoclserv_chunk_status(c->chunkid,MFS_ERROR_NOSPACE);
1326 					} else {
1327 						matoclserv_chunk_status(c->chunkid,MFS_ERROR_NOTDONE);
1328 					}
1329 					c->operation = NONE;
1330 				}
1331 			}
1332 		} else {
1333 			if (c->operation!=REPLICATE) {
1334 				c->interrupted = 1;
1335 			}
1336 		}
1337 	}
1338 	chunk_priority_queue_check(c,1);
1339 	return 0;
1340 }
1341 
chunk_mr_increase_version(uint64_t chunkid)1342 int chunk_mr_increase_version(uint64_t chunkid) {
1343 	chunk *c;
1344 	c = chunk_find(chunkid);
1345 	if (c==NULL) {
1346 		return MFS_ERROR_NOCHUNK;
1347 	}
1348 	c->version++;
1349 	meta_version_inc();
1350 	return MFS_STATUS_OK;
1351 }
1352 
1353 /* --- */
1354 
chunk_find_sclassid(chunk * c)1355 static inline void chunk_find_sclassid(chunk *c) {
1356 	uint32_t i;
1357 	uint8_t g;
1358 	uint8_t mg;
1359 	uint8_t sclassid,v;
1360 	mg = 0;
1361 	sclassid = 0;
1362 	v = 0;
1363 	for (i=1 ; i<c->ftab[0] ; i++) {
1364 		if (c->ftab[i]>0) {
1365 			g = sclass_get_keepmax_goal(i);
1366 			if (g>mg) {
1367 				mg = g;
1368 				sclassid = i;
1369 				v = 0;
1370 			} else if (g==mg) {
1371 				if (sclassid<=9 && v==0) {
1372 					sclassid = i;
1373 				} else if (sclassid>9 && i>9) {
1374 					sclassid = g;
1375 					v = 1;
1376 				}
1377 			}
1378 		}
1379 	}
1380 	massert(sclassid>0,"wrong labels set");
1381 	c->sclassid = sclassid;
1382 }
1383 
chunk_change_file(uint64_t chunkid,uint8_t prevsclassid,uint8_t newsclassid)1384 int chunk_change_file(uint64_t chunkid,uint8_t prevsclassid,uint8_t newsclassid) {
1385 	chunk *c;
1386 	uint8_t oldsclassid;
1387 
1388 	if (prevsclassid==newsclassid) {
1389 		return MFS_STATUS_OK;
1390 	}
1391 	c = chunk_find(chunkid);
1392 	if (c==NULL) {
1393 		return MFS_ERROR_NOCHUNK;
1394 	}
1395 	if (c->fcount==0) {
1396 		syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016"PRIX64")",c->chunkid);
1397 		return MFS_ERROR_CHUNKLOST;	// MFS_ERROR_STRUCTURE
1398 	}
1399 	oldsclassid = c->sclassid;
1400 	if (c->fcount==1) {
1401 		c->sclassid = newsclassid;
1402 	} else {
1403 		if (c->ftab==NULL) {
1404 			uint32_t ftableng = prevsclassid;
1405 			if (newsclassid > ftableng) {
1406 				ftableng = newsclassid;
1407 			}
1408 			ftableng++;
1409 			c->ftab = malloc(sizeof(uint32_t)*(ftableng+1));
1410 			passert(c->ftab);
1411 			memset(c->ftab,0,sizeof(uint32_t)*(ftableng+1));
1412 			c->ftab[0] = ftableng+1;
1413 			massert(c->sclassid==prevsclassid,"wrong labels set");
1414 			c->ftab[prevsclassid] = c->fcount-1;
1415 			c->ftab[newsclassid] = 1;
1416 			chunk_find_sclassid(c);
1417 		} else {
1418 			if (newsclassid >= c->ftab[0]) {
1419 				c->ftab = realloc(c->ftab,sizeof(uint32_t)*(newsclassid+1));
1420 				passert(c->ftab);
1421 				memset(c->ftab+c->ftab[0],0,sizeof(uint32_t)*(newsclassid+1-c->ftab[0]));
1422 				c->ftab[0] = newsclassid+1;
1423 			}
1424 			massert(c->ftab[prevsclassid]>0,"wrong ftab entry");
1425 			c->ftab[prevsclassid]--;
1426 			c->ftab[newsclassid]++;
1427 			chunk_find_sclassid(c);
1428 		}
1429 	}
1430 	if (oldsclassid!=c->sclassid) {
1431 		chunk_state_change(oldsclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1432 		chunk_priority_queue_check(c,1);
1433 	} else {
1434 		chunk_priority_queue_check(c,0);
1435 	}
1436 	return MFS_STATUS_OK;
1437 }
1438 
chunk_delete_file_int(chunk * c,uint8_t sclassid,uint32_t delete_timeout)1439 static inline int chunk_delete_file_int(chunk *c,uint8_t sclassid,uint32_t delete_timeout) {
1440 	uint8_t oldsclassid;
1441 
1442 	if (c->fcount==0) {
1443 		syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016"PRIX64")",c->chunkid);
1444 		return MFS_ERROR_CHUNKLOST;	// MFS_ERROR_STRUCTURE
1445 	}
1446 	massert(sclassid>0,"wrong labels set");
1447 	oldsclassid = c->sclassid;
1448 	if (c->fcount==1) {
1449 		c->sclassid = 0;
1450 		c->fcount = 0;
1451 //#ifdef METARESTORE
1452 //		printf("D%"PRIu64"\n",c->chunkid);
1453 //#endif
1454 	} else {
1455 		if (c->ftab) {
1456 			c->ftab[sclassid]--;
1457 			chunk_find_sclassid(c);
1458 		}
1459 		c->fcount--;
1460 		if (c->fcount==1 && c->ftab) {
1461 			free(c->ftab);
1462 			c->ftab = NULL;
1463 		}
1464 	}
1465 	if (oldsclassid!=c->sclassid) {
1466 		chunk_state_change(oldsclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1467 	}
1468 	if (c->fcount==0 && delete_timeout>0) {
1469 		c->lockedto = (uint32_t)main_time()+delete_timeout;
1470 	}
1471 	return MFS_STATUS_OK;
1472 }
1473 
chunk_add_file_int(chunk * c,uint8_t sclassid)1474 static inline int chunk_add_file_int(chunk *c,uint8_t sclassid) {
1475 	uint8_t oldsclassid;
1476 
1477 	massert(sclassid>0,"wrong labels set");
1478 	oldsclassid = c->sclassid;
1479 	if (c->fcount==0) {
1480 		c->sclassid = sclassid;
1481 		c->fcount = 1;
1482 	} else if (sclassid==c->sclassid) {
1483 		c->fcount++;
1484 		if (c->ftab) {
1485 			c->ftab[sclassid]++;
1486 		}
1487 	} else {
1488 		if (c->ftab==NULL) {
1489 			uint32_t ftableng = c->sclassid;
1490 			if (sclassid > ftableng) {
1491 				ftableng = sclassid;
1492 			}
1493 			ftableng++;
1494 			c->ftab = malloc(sizeof(uint32_t)*(ftableng+1));
1495 			passert(c->ftab);
1496 			memset(c->ftab,0,sizeof(uint32_t)*(ftableng+1));
1497 			c->ftab[0] = ftableng+1;
1498 			c->ftab[c->sclassid] = c->fcount;
1499 			c->ftab[sclassid] = 1;
1500 			c->fcount++;
1501 			chunk_find_sclassid(c);
1502 		} else {
1503 			if (sclassid >= c->ftab[0]) {
1504 				c->ftab = realloc(c->ftab,sizeof(uint32_t)*(sclassid+1));
1505 				passert(c->ftab);
1506 				memset(c->ftab+c->ftab[0],0,sizeof(uint32_t)*(sclassid+1-c->ftab[0]));
1507 				c->ftab[0] = sclassid+1;
1508 			}
1509 			c->ftab[sclassid]++;
1510 			c->fcount++;
1511 			chunk_find_sclassid(c);
1512 		}
1513 	}
1514 	if (oldsclassid!=c->sclassid) {
1515 		chunk_state_change(oldsclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1516 	}
1517 	return MFS_STATUS_OK;
1518 }
1519 
chunk_delete_file(uint64_t chunkid,uint8_t sclassid)1520 int chunk_delete_file(uint64_t chunkid,uint8_t sclassid) {
1521 	chunk *c;
1522 	c = chunk_find(chunkid);
1523 	if (c==NULL) {
1524 		return MFS_ERROR_NOCHUNK;
1525 	}
1526 	return chunk_delete_file_int(c,sclassid,0);
1527 }
1528 
chunk_add_file(uint64_t chunkid,uint8_t sclassid)1529 int chunk_add_file(uint64_t chunkid,uint8_t sclassid) {
1530 	chunk *c;
1531 	c = chunk_find(chunkid);
1532 	if (c==NULL) {
1533 		return MFS_ERROR_NOCHUNK;
1534 	}
1535 	return chunk_add_file_int(c,sclassid);
1536 }
1537 
chunk_write_counters(chunk * c,uint8_t x)1538 static inline void chunk_write_counters(chunk *c,uint8_t x) {
1539 	slist *s;
1540 	if (x) {
1541 		if (c->writeinprogress==0) {
1542 			c->writeinprogress = 1;
1543 		} else {
1544 			return;
1545 		}
1546 	} else {
1547 		if (c->writeinprogress) {
1548 			c->writeinprogress = 0;
1549 		} else {
1550 			return;
1551 		}
1552 	}
1553 	for (s=c->slisthead ;s ; s=s->next) {
1554 		if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
1555 			matocsserv_write_counters(cstab[s->csid].ptr,x);
1556 		}
1557 	}
1558 }
1559 
chunk_locked_or_busy(void * cptr)1560 int chunk_locked_or_busy(void *cptr) {
1561 	chunk *c = (chunk*)cptr;
1562 	return (c->lockedto<(uint32_t)(main_time()) && c->operation==NONE)?0:1;
1563 }
1564 
chunk_get_validcopies(uint64_t chunkid,uint8_t * vcopies)1565 int chunk_get_validcopies(uint64_t chunkid,uint8_t *vcopies) {
1566 	chunk *c;
1567 	*vcopies = 0;
1568 	c = chunk_find(chunkid);
1569 	if (c==NULL) {
1570 		return MFS_ERROR_NOCHUNK;
1571 	}
1572 	*vcopies = c->allvalidcopies;
1573 	return MFS_STATUS_OK;
1574 }
1575 
chunk_get_archflag(uint64_t chunkid,uint8_t * archflag)1576 int chunk_get_archflag(uint64_t chunkid,uint8_t *archflag) {
1577 	chunk *c;
1578 	c = chunk_find(chunkid);
1579 	if (c==NULL) {
1580 		return MFS_ERROR_NOCHUNK;
1581 	}
1582 	*archflag = c->archflag;
1583 	return MFS_STATUS_OK;
1584 }
1585 
chunk_univ_archflag(uint64_t chunkid,uint8_t archflag,uint32_t * archflagchanged)1586 int chunk_univ_archflag(uint64_t chunkid,uint8_t archflag,uint32_t *archflagchanged) {
1587 	chunk *c;
1588 	c = chunk_find(chunkid);
1589 	if (c==NULL) {
1590 		return MFS_ERROR_NOCHUNK;
1591 	}
1592 	if (archflag != c->archflag) {
1593 		chunk_state_change(c->sclassid,c->sclassid,c->archflag,archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1594 		c->archflag = archflag;
1595 		chunk_priority_queue_check(c,1);
1596 		(*archflagchanged)++;
1597 	}
1598 	return MFS_STATUS_OK;
1599 }
1600 
1601 // CHUNK_FLOOP_NOTFOUND
1602 // CHUNK_FLOOP_DELETED
1603 // CHUNK_FLOOP_MISSING_NOCOPY
1604 // CHUNK_FLOOP_MISSING_INVALID
1605 // CHUNK_FLOOP_MISSING_WRONGVERSION
1606 // CHUNK_FLOOP_UNDERGOAL_AFLAG_NOT_CHANGED
1607 // CHUNK_FLOOP_UNDERGOAL_AFLAG_CHANGED
1608 // CHUNK_FLOOP_OK_AFLAG_NOT_CHANGED
1609 // CHUNK_FLOOP_OK_AFLAG_CHANGED
chunk_fileloop_task(uint64_t chunkid,uint8_t sclassid,uint8_t aftereof,uint8_t archflag)1610 chunkfloop chunk_fileloop_task(uint64_t chunkid,uint8_t sclassid,uint8_t aftereof,uint8_t archflag) {
1611 	chunk *c;
1612 	slist *s;
1613 	uint8_t aflagchg;
1614 	c = chunk_find(chunkid);
1615 	if (c==NULL) {
1616 		return CHUNK_FLOOP_NOTFOUND;
1617 	}
1618 	if (c->allvalidcopies==0 && aftereof && c->lockedto<(uint32_t)(main_time()) && c->operation==NONE) {
1619 		chunk_delete_file_int(c,sclassid,UNUSED_DELETE_TIMEOUT);
1620 		return CHUNK_FLOOP_DELETED;
1621 	}
1622 	if (c->allvalidcopies==0) {
1623 		if (c->slisthead==NULL) {
1624 			return CHUNK_FLOOP_MISSING_NOCOPY;
1625 		}
1626 		for (s=c->slisthead ; s ; s=s->next) {
1627 			if (s->valid==WVER || s->valid==TDWVER) {
1628 				return CHUNK_FLOOP_MISSING_WRONGVERSION;
1629 			}
1630 		}
1631 		return CHUNK_FLOOP_MISSING_INVALID;
1632 	}
1633 	if (archflag==1 && c->archflag==0) {
1634 		chunk_state_change(c->sclassid,c->sclassid,c->archflag,archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,c->regularvalidcopies);
1635 		c->archflag = archflag;
1636 		chunk_priority_queue_check(c,1);
1637 		aflagchg = 1;
1638 	} else {
1639 		aflagchg = 0;
1640 	}
1641 	if (c->allvalidcopies < sclass_get_keeparch_goal(c->sclassid,c->archflag)) {
1642 		return aflagchg?CHUNK_FLOOP_UNDERGOAL_AFLAG_CHANGED:CHUNK_FLOOP_UNDERGOAL_AFLAG_NOT_CHANGED;
1643 	}
1644 	return aflagchg?CHUNK_FLOOP_OK_AFLAG_CHANGED:CHUNK_FLOOP_OK_AFLAG_NOT_CHANGED;
1645 }
1646 
chunk_read_check(uint32_t ts,uint64_t chunkid)1647 int chunk_read_check(uint32_t ts,uint64_t chunkid) {
1648 	chunk *c;
1649 	c = chunk_find(chunkid);
1650 	if (c==NULL) {
1651 		return MFS_ERROR_NOCHUNK;
1652 	}
1653 	if (c->lockedto>=ts) {
1654 		return MFS_ERROR_LOCKED;
1655 	}
1656 	if (c->operation != NONE) {
1657 		return MFS_ERROR_CHUNKBUSY;
1658 	}
1659 	return MFS_STATUS_OK;
1660 }
1661 
chunk_univ_multi_modify(uint32_t ts,uint8_t mr,uint8_t continueop,uint64_t * nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t * opflag,uint32_t clientip)1662 int chunk_univ_multi_modify(uint32_t ts,uint8_t mr,uint8_t continueop,uint64_t *nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t *opflag,uint32_t clientip) {
1663 	uint16_t csids[MAXCSCOUNT];
1664 	static void **chosen = NULL;
1665 	static uint32_t chosenleng = 0;
1666 	uint16_t servcount=0;
1667 	uint32_t vc;
1668 	uint8_t overloaded;
1669 	slist *os,*s;
1670 	uint32_t i;
1671 	chunk *oc,*c;
1672 	uint8_t csstable,csalldata;
1673 	uint8_t cschanges;
1674 
1675 	if (ts>(starttime+10) && csregisterinprogress==0) {
1676 		csstable = 1;
1677 	} else {
1678 		csstable = 0;
1679 	}
1680 
1681 	cschanges = (csstable==0 || (csreceivingchunks&2))?1:0;
1682 
1683 	if (chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
1684 		csalldata = 1;
1685 	} else {
1686 		csalldata = 0;
1687 	}
1688 
1689 	if (ochunkid==0) {	// new chunk
1690 		if (mr==0) {
1691 			servcount = chunk_creation_servers(csids,sclassid,&overloaded,clientip);
1692 			if (servcount==0) {
1693 				if (overloaded || csalldata==0) {
1694 					return MFS_ERROR_EAGAIN; // try again for ever
1695 				} else {
1696 					uint16_t scount;
1697 					scount = matocsserv_servers_count();
1698 					if (scount>0 && csstable) {
1699 						return MFS_ERROR_NOSPACE; // return error
1700 					} else {
1701 						return MFS_ERROR_NOCHUNKSERVERS; // try again
1702 					}
1703 				}
1704 			}
1705 			c = chunk_new(nextchunkid++);
1706 			c->version = 1;
1707 			c->interrupted = 0;
1708 			c->operation = CREATE;
1709 			chunk_add_file_int(c,sclassid);
1710 			if (servcount<sclass_get_create_goal(sclassid)) {
1711 				c->allvalidcopies = servcount;
1712 				c->regularvalidcopies = servcount;
1713 			} else {
1714 				c->allvalidcopies = sclass_get_create_goal(sclassid);
1715 				c->regularvalidcopies = sclass_get_create_goal(sclassid);
1716 			}
1717 			if (c->allvalidcopies>chosenleng) {
1718 				chosenleng = c->allvalidcopies+10;
1719 				chosen = malloc(sizeof(void*)*chosenleng);
1720 				passert(chosen);
1721 			}
1722 			for (i=0 ; i<c->allvalidcopies ; i++) {
1723 				s = slist_malloc();
1724 				s->csid = csids[i];
1725 				s->valid = BUSY;
1726 				s->version = c->version;
1727 				s->next = c->slisthead;
1728 				c->slisthead = s;
1729 				chosen[i] = cstab[s->csid].ptr;
1730 				stats_chunkops[CHUNK_OP_CREATE_TRY]++;
1731 				matocsserv_send_createchunk(cstab[s->csid].ptr,c->chunkid,c->version);
1732 				chunk_addopchunk(s->csid,c->chunkid);
1733 			}
1734 			matocsserv_useservers_wrandom(chosen,c->allvalidcopies);
1735 			chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
1736 			*opflag=1;
1737 			*nchunkid = c->chunkid;
1738 		} else {
1739 			if (*nchunkid != nextchunkid) {
1740 				return MFS_ERROR_MISMATCH;
1741 			}
1742 			c = chunk_new(nextchunkid++);
1743 			c->version = 1;
1744 			chunk_add_file_int(c,sclassid);
1745 		}
1746 	} else {
1747 		c = NULL;
1748 		oc = chunk_find(ochunkid);
1749 		if (oc && mr==0) {
1750 			if (chunk_remove_disconnected_chunks(oc)) {
1751 				oc = NULL;
1752 			}
1753 		}
1754 		if (oc==NULL) {
1755 			return MFS_ERROR_NOCHUNK;
1756 		}
1757 		if (mr==0 && oc->lockedto>=ts && continueop==0) {
1758 			return MFS_ERROR_LOCKED;
1759 		}
1760 		if (oc->fcount==1) {
1761 			c = oc;
1762 			if (mr==0) {
1763 				*nchunkid = ochunkid;
1764 				if (c->operation!=NONE) {
1765 					return MFS_ERROR_CHUNKBUSY;
1766 				}
1767 				if (cschanges) {
1768 					vc = 0;
1769 					for (s=c->slisthead ; s ; s=s->next) {
1770 						if (s->valid==VALID) {
1771 							vc++;
1772 						}
1773 					}
1774 					if (vc < sclass_get_keeparch_goal(c->sclassid,c->archflag)) {
1775 						return MFS_ERROR_EAGAIN; // just try again later
1776 					}
1777 				}
1778 				if (c->needverincrease) {
1779 					i=0;
1780 					for (s=c->slisthead ;s ; s=s->next) {
1781 						if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
1782 							if (s->valid==TDVALID || s->valid==TDBUSY) {
1783 								s->valid = TDBUSY;
1784 							} else {
1785 								s->valid = BUSY;
1786 							}
1787 							s->version = c->version+1;
1788 							stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
1789 							matocsserv_send_setchunkversion(cstab[s->csid].ptr,ochunkid,c->version+1,c->version);
1790 							chunk_addopchunk(s->csid,c->chunkid);
1791 							i++;
1792 						}
1793 					}
1794 					if (i>0) {
1795 						c->interrupted = 0;
1796 						c->operation = SET_VERSION;
1797 						c->version++;
1798 						*opflag = 1;
1799 					} else {
1800 						if (csalldata) {
1801 							return MFS_ERROR_CHUNKLOST; // return error
1802 						} else {
1803 							return MFS_ERROR_CSNOTPRESENT; // try again
1804 						}
1805 					}
1806 				} else {
1807 					*opflag = 0;
1808 				}
1809 			} else {
1810 				if (*nchunkid != ochunkid) {
1811 					return MFS_ERROR_MISMATCH;
1812 				}
1813 				if (*opflag) {
1814 					c->version++;
1815 				}
1816 			}
1817 		} else {
1818 			if (oc->fcount==0) {	// it's serious structure error
1819 				if (mr==0) {
1820 					syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016"PRIX64")",ochunkid);
1821 				} else {
1822 					printf("serious structure inconsistency: (chunkid:%016"PRIX64")\n",ochunkid);
1823 				}
1824 				return MFS_ERROR_CHUNKLOST;	// MFS_ERROR_STRUCTURE
1825 			}
1826 			if (mr==0) {
1827 				if (oc->operation!=NONE) {
1828 					return MFS_ERROR_CHUNKBUSY;
1829 				}
1830 				if (cschanges) {
1831 					vc = 0;
1832 					for (os=oc->slisthead ; os ; os=os->next) {
1833 						if (os->valid==VALID) {
1834 							vc++;
1835 						}
1836 					}
1837 					if (vc < sclass_get_keeparch_goal(oc->sclassid,oc->archflag)) {
1838 						return MFS_ERROR_EAGAIN; // just try again later
1839 					}
1840 				}
1841 				i=0;
1842 				for (os=oc->slisthead ;os ; os=os->next) {
1843 					if (os->valid!=INVALID && os->valid!=DEL && os->valid!=WVER && os->valid!=TDWVER) {
1844 						if (c==NULL) {
1845 							c = chunk_new(nextchunkid++);
1846 							c->version = 1;
1847 							c->interrupted = 0;
1848 							c->operation = DUPLICATE;
1849 							chunk_delete_file_int(oc,sclassid,0);
1850 							chunk_add_file_int(c,sclassid);
1851 						}
1852 						s = slist_malloc();
1853 						s->csid = os->csid;
1854 						s->valid = BUSY;
1855 						s->version = c->version;
1856 						s->next = c->slisthead;
1857 						c->slisthead = s;
1858 						c->allvalidcopies++;
1859 						c->regularvalidcopies++;
1860 						stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
1861 						matocsserv_send_duplicatechunk(cstab[s->csid].ptr,c->chunkid,c->version,oc->chunkid,oc->version);
1862 						chunk_addopchunk(s->csid,c->chunkid);
1863 						i++;
1864 					}
1865 				}
1866 				if (c!=NULL) {
1867 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
1868 				}
1869 				if (i>0) {
1870 					*nchunkid = c->chunkid;
1871 					*opflag=1;
1872 				} else {
1873 					if (csalldata) {
1874 						return MFS_ERROR_CHUNKLOST; // return error
1875 					} else {
1876 						return MFS_ERROR_CSNOTPRESENT; // try again
1877 					}
1878 				}
1879 			} else {
1880 				if (*nchunkid != nextchunkid) {
1881 					return MFS_ERROR_MISMATCH;
1882 				}
1883 				c = chunk_new(nextchunkid++);
1884 				c->version = 1;
1885 				chunk_delete_file_int(oc,sclassid,0);
1886 				chunk_add_file_int(c,sclassid);
1887 				*nchunkid = c->chunkid;
1888 			}
1889 		}
1890 	}
1891 
1892 	c->lockedto = ts+LOCKTIMEOUT;
1893 	chunk_write_counters(c,1);
1894 	return MFS_STATUS_OK;
1895 }
1896 
chunk_multi_modify(uint8_t continueop,uint64_t * nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t * opflag,uint32_t clientip)1897 int chunk_multi_modify(uint8_t continueop,uint64_t *nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t *opflag,uint32_t clientip) {
1898 	return chunk_univ_multi_modify(main_time(),0,continueop,nchunkid,ochunkid,sclassid,opflag,clientip);
1899 }
1900 
chunk_mr_multi_modify(uint32_t ts,uint64_t * nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t opflag)1901 int chunk_mr_multi_modify(uint32_t ts,uint64_t *nchunkid,uint64_t ochunkid,uint8_t sclassid,uint8_t opflag) {
1902 	return chunk_univ_multi_modify(ts,1,0,nchunkid,ochunkid,sclassid,&opflag,0);
1903 }
1904 
chunk_univ_multi_truncate(uint32_t ts,uint8_t mr,uint64_t * nchunkid,uint64_t ochunkid,uint32_t length,uint8_t sclassid)1905 int chunk_univ_multi_truncate(uint32_t ts,uint8_t mr,uint64_t *nchunkid,uint64_t ochunkid,uint32_t length,uint8_t sclassid) {
1906 	slist *os,*s;
1907 	uint32_t i;
1908 	chunk *oc,*c;
1909 	uint8_t csstable,csalldata;
1910 	uint8_t cschanges;
1911 	uint32_t vc;
1912 
1913 	if (ts>(starttime+10) && csregisterinprogress==0) {
1914 		csstable = 1;
1915 	} else {
1916 		csstable = 0;
1917 	}
1918 
1919 	cschanges = (csstable==0 || (csreceivingchunks&2))?1:0;
1920 
1921 	if (chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
1922 		csalldata = 1;
1923 	} else {
1924 		csalldata = 0;
1925 	}
1926 
1927 	c=NULL;
1928 	oc = chunk_find(ochunkid);
1929 	if (oc && mr==0) {
1930 		if (chunk_remove_disconnected_chunks(oc)) {
1931 			oc = NULL;
1932 		}
1933 	}
1934 
1935 	if (oc==NULL) {
1936 		return MFS_ERROR_NOCHUNK;
1937 	}
1938 	if (mr==0 && oc->lockedto>=ts) {
1939 		return MFS_ERROR_LOCKED;
1940 	}
1941 	if (oc->fcount==1) {
1942 		c = oc;
1943 		if (mr==0) {
1944 			*nchunkid = ochunkid;
1945 			if (c->operation!=NONE) {
1946 				return MFS_ERROR_CHUNKBUSY;
1947 			}
1948 			if (cschanges) {
1949 				vc = 0;
1950 				for (os=oc->slisthead ; os ; os=os->next) {
1951 					if (os->valid==VALID) {
1952 						vc++;
1953 					}
1954 				}
1955 				if (vc < sclass_get_keeparch_goal(oc->sclassid,oc->archflag)) {
1956 					return MFS_ERROR_EAGAIN; // just try again later
1957 				}
1958 			}
1959 			i=0;
1960 			for (s=c->slisthead ;s ; s=s->next) {
1961 				if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
1962 					if (s->valid==TDVALID || s->valid==TDBUSY) {
1963 						s->valid = TDBUSY;
1964 					} else {
1965 						s->valid = BUSY;
1966 					}
1967 					s->version = c->version+1;
1968 					stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
1969 					matocsserv_send_truncatechunk(cstab[s->csid].ptr,ochunkid,length,c->version+1,c->version);
1970 					chunk_addopchunk(s->csid,c->chunkid);
1971 					i++;
1972 				}
1973 			}
1974 			if (i>0) {
1975 				c->interrupted = 0;
1976 				c->operation = TRUNCATE;
1977 				c->version++;
1978 			} else {
1979 				if (csalldata) {
1980 					return MFS_ERROR_CHUNKLOST; // return error
1981 				} else {
1982 					return MFS_ERROR_CSNOTPRESENT; // try again
1983 				}
1984 			}
1985 		} else {
1986 			if (*nchunkid != ochunkid) {
1987 				return MFS_ERROR_MISMATCH;
1988 			}
1989 			c->version++;
1990 		}
1991 	} else {
1992 		if (oc->fcount==0) {	// it's serious structure error
1993 			if (mr==0) {
1994 				syslog(LOG_WARNING,"serious structure inconsistency: (chunkid:%016"PRIX64")",ochunkid);
1995 			} else {
1996 				printf("serious structure inconsistency: (chunkid:%016"PRIX64")\n",ochunkid);
1997 			}
1998 			return MFS_ERROR_CHUNKLOST;	// MFS_ERROR_STRUCTURE
1999 		}
2000 		if (mr==0) {
2001 			if (oc->operation!=NONE) {
2002 				return MFS_ERROR_CHUNKBUSY;
2003 			}
2004 			if (cschanges) {
2005 				vc = 0;
2006 				for (os=oc->slisthead ; os ; os=os->next) {
2007 					if (os->valid==VALID) {
2008 						vc++;
2009 					}
2010 				}
2011 				if (vc < sclass_get_keeparch_goal(oc->sclassid,oc->archflag)) {
2012 					return MFS_ERROR_EAGAIN; // just try again later
2013 				}
2014 			}
2015 			i=0;
2016 			for (os=oc->slisthead ;os ; os=os->next) {
2017 				if (os->valid!=INVALID && os->valid!=DEL && os->valid!=WVER && os->valid!=TDWVER) {
2018 					if (c==NULL) {
2019 						c = chunk_new(nextchunkid++);
2020 						c->version = 1;
2021 						c->interrupted = 0;
2022 						c->operation = DUPTRUNC;
2023 						chunk_delete_file_int(oc,sclassid,0);
2024 						chunk_add_file_int(c,sclassid);
2025 					}
2026 					s = slist_malloc();
2027 					s->csid = os->csid;
2028 					s->valid = BUSY;
2029 					s->version = c->version;
2030 					s->next = c->slisthead;
2031 					c->slisthead = s;
2032 					c->allvalidcopies++;
2033 					c->regularvalidcopies++;
2034 					stats_chunkops[CHUNK_OP_CHANGE_TRY]++;
2035 					matocsserv_send_duptruncchunk(cstab[s->csid].ptr,c->chunkid,c->version,oc->chunkid,oc->version,length);
2036 					chunk_addopchunk(s->csid,c->chunkid);
2037 					i++;
2038 				}
2039 			}
2040 			if (c!=NULL) {
2041 				chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
2042 			}
2043 			if (i>0) {
2044 				*nchunkid = c->chunkid;
2045 			} else {
2046 				if (csalldata) {
2047 					return MFS_ERROR_CHUNKLOST; // return error
2048 				} else {
2049 					return MFS_ERROR_CSNOTPRESENT; // try again
2050 				}
2051 			}
2052 		} else {
2053 			if (*nchunkid != nextchunkid) {
2054 				return MFS_ERROR_MISMATCH;
2055 			}
2056 			c = chunk_new(nextchunkid++);
2057 			c->version = 1;
2058 			chunk_delete_file_int(oc,sclassid,0);
2059 			chunk_add_file_int(c,sclassid);
2060 			*nchunkid = c->chunkid;
2061 		}
2062 	}
2063 
2064 	c->lockedto=ts+LOCKTIMEOUT;
2065 	return MFS_STATUS_OK;
2066 }
2067 
chunk_multi_truncate(uint64_t * nchunkid,uint64_t ochunkid,uint32_t length,uint8_t sclassid)2068 int chunk_multi_truncate(uint64_t *nchunkid,uint64_t ochunkid,uint32_t length,uint8_t sclassid) {
2069 	return chunk_univ_multi_truncate(main_time(),0,nchunkid,ochunkid,length,sclassid);
2070 }
2071 
chunk_mr_multi_truncate(uint32_t ts,uint64_t * nchunkid,uint64_t ochunkid,uint8_t sclassid)2072 int chunk_mr_multi_truncate(uint32_t ts,uint64_t *nchunkid,uint64_t ochunkid,uint8_t sclassid) {
2073 	return chunk_univ_multi_truncate(ts,1,nchunkid,ochunkid,0,sclassid);
2074 }
2075 
chunk_repair(uint8_t sclassid,uint64_t ochunkid,uint32_t * nversion)2076 int chunk_repair(uint8_t sclassid,uint64_t ochunkid,uint32_t *nversion) {
2077 	uint32_t bestversion;
2078 	chunk *c;
2079 	slist *s;
2080 
2081 	*nversion=0;
2082 	if (ochunkid==0) {
2083 		return 0;	// not changed
2084 	}
2085 
2086 	c = chunk_find(ochunkid);
2087 	if (c==NULL) {	// no such chunk - erase (nchunkid already is 0 - so just return with "changed" status)
2088 		return 1;
2089 	}
2090 	if (c->lockedto>=(uint32_t)main_time()) { // can't repair locked chunks - but if it's locked, then likely it doesn't need to be repaired
2091 		return 0;
2092 	}
2093 	chunk_write_counters(c,0);
2094 	bestversion = 0;
2095 	for (s=c->slisthead ; s ; s=s->next) {
2096 		if (cstab[s->csid].valid) {
2097 			if (s->valid == VALID || s->valid == TDVALID || s->valid == BUSY || s->valid == TDBUSY) {	// found chunk that is ok - so return
2098 				return 0;
2099 			}
2100 			if (s->valid == WVER || s->valid == TDWVER) {
2101 				if (s->version>=bestversion) {
2102 					bestversion = s->version;
2103 				}
2104 			}
2105 		}
2106 	}
2107 	if (bestversion==0) {	// didn't find sensible chunk - so erase it
2108 		chunk_delete_file_int(c,sclassid,0);
2109 		return 1;
2110 	}
2111 	if (c->allvalidcopies>0 || c->regularvalidcopies>0) {
2112 		if (c->allvalidcopies>0) {
2113 			syslog(LOG_WARNING,"wrong all valid copies counter - (counter value: %u, should be: 0) - fixed",c->allvalidcopies);
2114 		}
2115 		if (c->regularvalidcopies>0) {
2116 			syslog(LOG_WARNING,"wrong regular valid copies counter - (counter value: %u, should be: 0) - fixed",c->regularvalidcopies);
2117 		}
2118 		chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,0,c->regularvalidcopies,0);
2119 		c->allvalidcopies = 0;
2120 		c->regularvalidcopies = 0;
2121 	}
2122 	c->version = bestversion;
2123 	for (s=c->slisthead ; s ; s=s->next) {
2124 		if (s->version==bestversion && cstab[s->csid].valid) {
2125 			if (s->valid==WVER) {
2126 				s->valid = VALID;
2127 				c->allvalidcopies++;
2128 				c->regularvalidcopies++;
2129 			} else if (s->valid==TDWVER) {
2130 				s->valid = TDVALID;
2131 				c->allvalidcopies++;
2132 			}
2133 		}
2134 	}
2135 	*nversion = bestversion;
2136 	chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
2137 	c->needverincrease = 1;
2138 	return 1;
2139 }
2140 
chunk_mr_set_version(uint64_t chunkid,uint32_t version)2141 int chunk_mr_set_version(uint64_t chunkid,uint32_t version) {
2142 	chunk *c;
2143 	c = chunk_find(chunkid);
2144 	if (c==NULL) {
2145 		return MFS_ERROR_NOCHUNK;
2146 	}
2147 	c->version = version;
2148 	meta_version_inc();
2149 	return MFS_STATUS_OK;
2150 }
2151 
2152 /* ---- */
2153 
2154 typedef struct locsort {
2155 	uint32_t ip;
2156 	uint16_t port;
2157 	uint32_t csver;
2158 	uint32_t labelmask;
2159 	uint32_t dist;
2160 	uint32_t rnd;
2161 } locsort;
2162 
chunk_locsort_cmp(const void * aa,const void * bb)2163 int chunk_locsort_cmp(const void *aa,const void *bb) {
2164 	const locsort *a = (const locsort*)aa;
2165 	const locsort *b = (const locsort*)bb;
2166 	if (a->dist<b->dist) {
2167 		return -1;
2168 	} else if (a->dist>b->dist) {
2169 		return 1;
2170 	} else if (a->rnd<b->rnd) {
2171 		return -1;
2172 	} else if (a->rnd>b->rnd) {
2173 		return 1;
2174 	}
2175 	return 0;
2176 }
2177 
chunk_get_version_and_csdata(uint8_t mode,uint64_t chunkid,uint32_t cuip,uint32_t * version,uint8_t * count,uint8_t cs_data[100* 14])2178 uint8_t chunk_get_version_and_csdata(uint8_t mode,uint64_t chunkid,uint32_t cuip,uint32_t *version,uint8_t *count,uint8_t cs_data[100*14]) {
2179 	chunk *c;
2180 	slist *s;
2181 	uint8_t i;
2182 	uint8_t cnt;
2183 	uint8_t *wptr;
2184 	locsort lstab[100];
2185 
2186 	c = chunk_find(chunkid);
2187 	if (c==NULL) {
2188 		return MFS_ERROR_NOCHUNK;
2189 	}
2190 	*version = c->version;
2191 	cnt=0;
2192 	for (s=c->slisthead ;s ; s=s->next) {
2193 		if (s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER && cstab[s->csid].valid) {
2194 			if (cnt<100 && matocsserv_get_csdata(cstab[s->csid].ptr,&(lstab[cnt].ip),&(lstab[cnt].port),&(lstab[cnt].csver),&(lstab[cnt].labelmask))==0) {
2195 				lstab[cnt].dist = topology_distance(lstab[cnt].ip,cuip);	// in the future prepare more sophisticated distance function
2196 				lstab[cnt].rnd = rndu32();
2197 				cnt++;
2198 			}
2199 		}
2200 	}
2201 	if (cnt==0) {
2202 		*count = 0;
2203 		if (chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
2204 			return MFS_ERROR_CHUNKLOST; // this is permanent state - chunk is definitely lost
2205 		} else {
2206 			return MFS_STATUS_OK;
2207 		}
2208 	}
2209 	qsort(lstab,cnt,sizeof(locsort),chunk_locsort_cmp);
2210 	wptr = cs_data;
2211 	for (i=0 ; i<cnt ; i++) {
2212 		put32bit(&wptr,lstab[i].ip);
2213 		put16bit(&wptr,lstab[i].port);
2214 		if (mode>0) {
2215 			put32bit(&wptr,lstab[i].csver);
2216 		}
2217 		if (mode>1) {
2218 			put32bit(&wptr,lstab[i].labelmask);
2219 		}
2220 	}
2221 	*count = cnt;
2222 	return MFS_STATUS_OK;
2223 }
2224 
chunk_get_copies(uint64_t chunkid,uint8_t * count)2225 uint8_t chunk_get_copies(uint64_t chunkid,uint8_t *count) {
2226 	chunk *c;
2227 	slist *s;
2228 	uint8_t cnt;
2229 	uint32_t ip;
2230 	uint16_t port;
2231 
2232 	c = chunk_find(chunkid);
2233 	if (c==NULL) {
2234 		return MFS_ERROR_NOCHUNK;
2235 	}
2236 	cnt = 0;
2237 	for (s=c->slisthead ; s && cnt<100 ; s=s->next) {
2238 		if (cstab[s->csid].valid && s->valid!=DEL) {
2239 			if (matocsserv_get_csdata(cstab[s->csid].ptr,&ip,&port,NULL,NULL)==0) {
2240 				cnt++;
2241 			}
2242 		}
2243 	}
2244 	*count = cnt;
2245 	return MFS_STATUS_OK;
2246 }
2247 
chunk_get_version(uint64_t chunkid,uint32_t * version)2248 uint8_t chunk_get_version(uint64_t chunkid,uint32_t *version) {
2249 	chunk *c;
2250 	c = chunk_find(chunkid);
2251 
2252 	if (c==NULL) {
2253 		*version = 0;
2254 		return MFS_ERROR_NOCHUNK;
2255 	}
2256 	*version = c->version;
2257 	return MFS_STATUS_OK;
2258 }
2259 
chunk_get_version_and_copies(uint64_t chunkid,uint32_t * version,uint8_t * count,uint8_t cs_data[100* 7])2260 uint8_t chunk_get_version_and_copies(uint64_t chunkid,uint32_t *version,uint8_t *count,uint8_t cs_data[100*7]) {
2261 	chunk *c;
2262 	slist *s;
2263 	uint8_t cnt;
2264 	uint32_t ip;
2265 	uint16_t port;
2266 	uint8_t *wptr;
2267 
2268 	c = chunk_find(chunkid);
2269 	if (c==NULL) {
2270 		return MFS_ERROR_NOCHUNK;
2271 	}
2272 	*version = c->version;
2273 	cnt=0;
2274 	wptr = cs_data;
2275 
2276 	for (s=c->slisthead ; s && cnt<100 ; s=s->next) {
2277 		if (cstab[s->csid].valid && s->valid!=DEL) {
2278 			if (matocsserv_get_csdata(cstab[s->csid].ptr,&ip,&port,NULL,NULL)==0) {
2279 				put32bit(&wptr,ip);
2280 				put16bit(&wptr,port);
2281 				if (s->valid==VALID || s->valid==BUSY) {
2282 					put8bit(&wptr,CHECK_VALID);
2283 				} else if (s->valid==TDVALID || s->valid==TDBUSY) {
2284 					put8bit(&wptr,CHECK_MARKEDFORREMOVAL);
2285 				} else if (s->valid==WVER) {
2286 					put8bit(&wptr,CHECK_WRONGVERSION);
2287 				} else if (s->valid==TDWVER) {
2288 					put8bit(&wptr,CHECK_WV_AND_MFR);
2289 				} else {
2290 					put8bit(&wptr,CHECK_INVALID);
2291 				}
2292 				cnt++;
2293 			}
2294 		}
2295 	}
2296 	*count = cnt;
2297 	return MFS_STATUS_OK;
2298 }
2299 
2300 /* ---- */
2301 
chunk_mr_nextchunkid(uint64_t nchunkid)2302 int chunk_mr_nextchunkid(uint64_t nchunkid) {
2303 	if (nchunkid>nextchunkid) {
2304 		nextchunkid=nchunkid;
2305 		meta_version_inc();
2306 		return MFS_STATUS_OK;
2307 	} else {
2308 		return MFS_ERROR_MISMATCH;
2309 	}
2310 }
2311 
chunk_mr_chunkadd(uint32_t ts,uint64_t chunkid,uint32_t version,uint32_t lockedto)2312 int chunk_mr_chunkadd(uint32_t ts,uint64_t chunkid,uint32_t version,uint32_t lockedto) {
2313 	chunk *c;
2314 	c = chunk_find(chunkid);
2315 	if (c) {
2316 		return MFS_ERROR_CHUNKEXIST;
2317 	}
2318 	if (chunkid>nextchunkid+UINT64_C(1000000000)) {
2319 		return MFS_ERROR_MISMATCH;
2320 	}
2321 	if (chunkid>=nextchunkid) {
2322 		nextchunkid=chunkid+1;
2323 	}
2324 	if (lockedto>0 && lockedto<ts) {
2325 		return MFS_ERROR_MISMATCH;
2326 	}
2327 	c = chunk_new(chunkid);
2328 	c->version = version;
2329 	c->lockedto = lockedto;
2330 	meta_version_inc();
2331 	return MFS_STATUS_OK;
2332 }
2333 
chunk_mr_chunkdel(uint32_t ts,uint64_t chunkid,uint32_t version)2334 int chunk_mr_chunkdel(uint32_t ts,uint64_t chunkid,uint32_t version) {
2335 	chunk *c;
2336 	c = chunk_find(chunkid);
2337 	if (c==NULL) {
2338 		return MFS_ERROR_NOCHUNK;
2339 	}
2340 	if (c->version != version) {
2341 		return MFS_ERROR_WRONGVERSION;
2342 	}
2343 	if (c->fcount!=0) {
2344 		return MFS_ERROR_ACTIVE;
2345 	}
2346 	if (c->slisthead!=NULL) {
2347 		return MFS_ERROR_CHUNKBUSY;
2348 	}
2349 	if (c->lockedto>=ts) {
2350 		return MFS_ERROR_LOCKED;
2351 	}
2352 	chunk_delete(c);
2353 	meta_version_inc();
2354 	return MFS_STATUS_OK;
2355 }
2356 
chunk_mfr_state_check(chunk * c)2357 static inline void chunk_mfr_state_check(chunk *c) {
2358 	slist *s;
2359 	uint8_t goal,vc,tdc;
2360 	goal = sclass_get_keeparch_goal(c->sclassid,c->archflag);
2361 	vc = 0;
2362 	tdc = 0;
2363 	for (s=c->slisthead ; s ; s=s->next) {
2364 		if (s->valid==VALID || s->valid==BUSY) {
2365 			vc++;
2366 		} else if (s->valid==TDVALID || s->valid==TDBUSY) {
2367 			tdc++;
2368 		}
2369 	}
2370 	if (vc < goal && tdc > 0) {
2371 		for (s=c->slisthead ; s ; s=s->next) {
2372 			if (s->valid==TDVALID || s->valid==TDBUSY) {
2373 				cstab[s->csid].mfr_state = REPL_IN_PROGRESS;
2374 			}
2375 		}
2376 	}
2377 }
2378 
chunk_server_has_chunk(uint16_t csid,uint64_t chunkid,uint32_t version)2379 void chunk_server_has_chunk(uint16_t csid,uint64_t chunkid,uint32_t version) {
2380 	chunk *c;
2381 	slist *s;
2382 #ifndef MFSDEBUG
2383 	static uint32_t loglastts = 0;
2384 	static uint32_t ilogcount = 0;
2385 	static uint32_t clogcount = 0;
2386 #else
2387 	uint8_t debug;
2388 #endif
2389 
2390 	cstab[csid].newchunkdelay = NEWCHUNKDELAY;
2391 	csreceivingchunks |= 2;
2392 
2393 	c = chunk_find(chunkid);
2394 	if (c) {
2395 		if (chunk_remove_disconnected_chunks(c)) {
2396 			c = NULL;
2397 		}
2398 	}
2399 
2400 	if (c==NULL) {
2401 #ifndef MFSDEBUG
2402 		if (loglastts+60<main_time()) {
2403 			ilogcount=0;
2404 			clogcount=0;
2405 			loglastts = main_time();
2406 		}
2407 #endif
2408 		if (chunkid==0 || chunkid>nextchunkid+UINT64_C(1000000000)) {
2409 #ifndef MFSDEBUG
2410 			if (ilogcount<10) {
2411 #endif
2412 				syslog(LOG_WARNING,"chunkserver (%s) has nonexistent chunk (%016"PRIX64"_%08"PRIX32"), id looks wrong - just ignore it",matocsserv_getstrip(cstab[csid].ptr),chunkid,(version&0x7FFFFFFF));
2413 #ifndef MFSDEBUG
2414 			} else if (ilogcount==10) {
2415 				syslog(LOG_WARNING,"there are more nonexistent chunks to ignore - stop logging");
2416 			}
2417 			ilogcount++;
2418 #endif
2419 			return;
2420 		}
2421 #ifndef MFSDEBUG
2422 		if (clogcount<10) {
2423 #endif
2424 			syslog(LOG_WARNING,"chunkserver (%s) has nonexistent chunk (%016"PRIX64"_%08"PRIX32"), so create it for future deletion",matocsserv_getstrip(cstab[csid].ptr),chunkid,(version&0x7FFFFFFF));
2425 #ifndef MFSDEBUG
2426 		} else if (clogcount==10) {
2427 			syslog(LOG_WARNING,"there are more nonexistent chunks to create - stop logging");
2428 		}
2429 		clogcount++;
2430 #endif
2431 		if (chunkid>=nextchunkid) {
2432 			nextchunkid=chunkid+1;
2433 //			changelog("%"PRIu32"|NEXTCHUNKID(%"PRIu64")",main_time(),nextchunkid);
2434 		}
2435 		c = chunk_new(chunkid);
2436 		c->version = (version&0x7FFFFFFF);
2437 		c->lockedto = (uint32_t)main_time()+UNUSED_DELETE_TIMEOUT;
2438 		changelog("%"PRIu32"|CHUNKADD(%"PRIu64",%"PRIu32",%"PRIu32")",main_time(),c->chunkid,c->version,c->lockedto);
2439 	}
2440 #ifdef MFSDEBUG
2441 	debug = ((chunkid&0xFFF)==0)?1:0;
2442 	if (debug) {
2443 		mfsdebug("chunk_server_has_chunk ; chunkid=%016"PRIX64" new copy: mfrstatus=%u ; csid=%"PRIu16" ; ip=%s",chunkid,(version&0x80000000)?1:0,csid,matocsserv_getstrip(cstab[csid].ptr));
2444 		for (s=c->slisthead ; s ; s=s->next) {
2445 			mfsdebug("chunk_server_has_chunk ; chunkid=%016"PRIX64" ; existing copy: mfrstatus=%u ; valid=%s ; csid=%"PRIu16" ; ip=%s",chunkid,(s->valid==TDVALID || s->valid==TDBUSY || s->valid==TDWVER)?1:0,validstr[s->valid],s->csid,matocsserv_getstrip(cstab[s->csid].ptr));
2446 		}
2447 	}
2448 #endif
2449 	for (s=c->slisthead ; s ; s=s->next) {
2450 		if (s->csid==csid) {
2451 			uint8_t nextallvalidcopies = c->allvalidcopies;
2452 			uint8_t nextregularvalidcopies = c->regularvalidcopies;
2453 			uint8_t nextvalid;
2454 			if (s->valid==INVALID || s->valid==DEL || s->valid==WVER || s->valid==TDWVER) { // ignore such copy
2455 				return;
2456 			}
2457 			// in case of other copies just check 'mark for removal' status
2458 			if (s->valid==BUSY || s->valid==TDBUSY) {
2459 				if (version&0x80000000) {
2460 					nextvalid = TDBUSY;
2461 				} else {
2462 					nextvalid = BUSY;
2463 				}
2464 			} else {
2465 				if (version&0x80000000) {
2466 					nextvalid = TDVALID;
2467 				} else {
2468 					nextvalid = VALID;
2469 				}
2470 			}
2471 			if (s->valid==nextvalid) {
2472 				return;
2473 			}
2474 			if (s->valid==VALID || s->valid==BUSY) {
2475 				nextallvalidcopies--;
2476 				nextregularvalidcopies--;
2477 			} else if (s->valid==TDVALID || s->valid==TDBUSY) {
2478 				nextallvalidcopies--;
2479 			}
2480 			if (nextvalid==VALID || nextvalid==BUSY) {
2481 				nextallvalidcopies++;
2482 				nextregularvalidcopies++;
2483 			} else if (nextvalid==TDVALID || nextvalid==TDBUSY) {
2484 				nextallvalidcopies++;
2485 			}
2486 			s->valid = nextvalid;
2487 			if (nextallvalidcopies!=c->allvalidcopies || nextregularvalidcopies!=c->regularvalidcopies) {
2488 				chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,nextallvalidcopies,c->regularvalidcopies,nextregularvalidcopies);
2489 				c->allvalidcopies = nextallvalidcopies;
2490 				c->regularvalidcopies = nextregularvalidcopies;
2491 				if (version&0x80000000) {
2492 					chunk_mfr_state_check(c);
2493 				}
2494 			}
2495 			return;
2496 		}
2497 	}
2498 	s = slist_malloc();
2499 	s->csid = csid;
2500 	if (c->version!=(version&0x7FFFFFFF)) {
2501 		if (version&0x80000000) {
2502 			s->valid = TDWVER;
2503 		} else {
2504 			s->valid = WVER;
2505 		}
2506 		s->version = version&0x7FFFFFFF;
2507 	} else {
2508 		if (c->writeinprogress) {
2509 			matocsserv_write_counters(cstab[csid].ptr,1);
2510 		}
2511 		if (version&0x80000000) {
2512 			s->valid = TDVALID;
2513 			s->version = c->version;
2514 			chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies);
2515 			c->allvalidcopies++;
2516 		} else {
2517 			s->valid = VALID;
2518 			s->version = c->version;
2519 			chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
2520 			c->allvalidcopies++;
2521 			c->regularvalidcopies++;
2522 		}
2523 	}
2524 	s->next = c->slisthead;
2525 	c->slisthead = s;
2526 	c->needverincrease = 1;
2527 	if (version&0x80000000) {
2528 		chunk_mfr_state_check(c);
2529 	}
2530 }
2531 
chunk_damaged(uint16_t csid,uint64_t chunkid)2532 void chunk_damaged(uint16_t csid,uint64_t chunkid) {
2533 	chunk *c;
2534 	slist *s;
2535 	c = chunk_find(chunkid);
2536 	if (c==NULL) {
2537 		if (chunkid>nextchunkid+UINT64_C(1000000000)) {
2538 			syslog(LOG_WARNING,"chunkserver has nonexistent chunk (%016"PRIX64"), id looks wrong - just ignore it",chunkid);
2539 			return;
2540 		}
2541 		syslog(LOG_WARNING,"chunkserver has nonexistent chunk (%016"PRIX64"), so create it for future deletion",chunkid);
2542 		if (chunkid>=nextchunkid) {
2543 			nextchunkid=chunkid+1;
2544 //			changelog("%"PRIu32"|NEXTCHUNKID(%"PRIu64")",main_time(),nextchunkid);
2545 		}
2546 		c = chunk_new(chunkid);
2547 		c->version = 0;
2548 		changelog("%"PRIu32"|CHUNKADD(%"PRIu64",%"PRIu32",%"PRIu32")",main_time(),c->chunkid,c->version,c->lockedto);
2549 	}
2550 	for (s=c->slisthead ; s ; s=s->next) {
2551 		if (s->csid==csid) {
2552 			if (s->valid==TDBUSY || s->valid==TDVALID) {
2553 				chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
2554 				c->allvalidcopies--;
2555 			}
2556 			if (s->valid==BUSY || s->valid==VALID) {
2557 				chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2558 				c->allvalidcopies--;
2559 				c->regularvalidcopies--;
2560 			}
2561 			if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
2562 				matocsserv_write_counters(cstab[csid].ptr,0);
2563 			}
2564 			s->valid = INVALID;
2565 			s->version = 0;
2566 			c->needverincrease = 1;
2567 			chunk_priority_queue_check(c,1);
2568 			chunk_mfr_state_check(c);
2569 			return;
2570 		}
2571 	}
2572 	s = slist_malloc();
2573 	s->csid = csid;
2574 	s->valid = INVALID;
2575 	s->version = 0;
2576 	s->next = c->slisthead;
2577 	c->needverincrease = 1;
2578 	c->slisthead = s;
2579 }
2580 
chunk_lost(uint16_t csid,uint64_t chunkid)2581 void chunk_lost(uint16_t csid,uint64_t chunkid) {
2582 	chunk *c;
2583 	slist **sptr,*s;
2584 
2585 	c = chunk_find(chunkid);
2586 	if (c==NULL) {
2587 		return;
2588 	}
2589 	sptr=&(c->slisthead);
2590 	while ((s=*sptr)) {
2591 		if (s->csid==csid) {
2592 			if (s->valid==TDBUSY || s->valid==TDVALID) {
2593 				chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
2594 				c->allvalidcopies--;
2595 			}
2596 			if (s->valid==BUSY || s->valid==VALID) {
2597 				chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2598 				c->allvalidcopies--;
2599 				c->regularvalidcopies--;
2600 			}
2601 			if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
2602 				matocsserv_write_counters(cstab[csid].ptr,0);
2603 			}
2604 			c->needverincrease = 1;
2605 			*sptr = s->next;
2606 			slist_free(s);
2607 
2608 			cstab[csid].lostchunkdelay = LOSTCHUNKDELAY;
2609 			csreceivingchunks |= 1;
2610 
2611 		} else {
2612 			sptr = &(s->next);
2613 		}
2614 	}
2615 	if (c->lockedto<(uint32_t)main_time() && c->slisthead==NULL && c->fcount==0 && c->ondangerlist==0 && chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
2616 		changelog("%"PRIu32"|CHUNKDEL(%"PRIu64",%"PRIu32")",main_time(),c->chunkid,c->version);
2617 		chunk_delete(c);
2618 	} else {
2619 		chunk_priority_queue_check(c,1);
2620 		chunk_mfr_state_check(c);
2621 	}
2622 }
2623 
chunk_get_mfrstatus(uint16_t csid)2624 uint8_t chunk_get_mfrstatus(uint16_t csid) {
2625 	if (csid<MAXCSCOUNT) {
2626 		switch (cstab[csid].mfr_state) {
2627 			case UNKNOWN_HARD:
2628 			case UNKNOWN_SOFT:
2629 				return 0;
2630 			case CAN_BE_REMOVED:
2631 				return 2;
2632 			case REPL_IN_PROGRESS:
2633 			case WAS_IN_PROGRESS:
2634 				return 1;
2635 
2636 		}
2637 	}
2638 	return 0;
2639 }
2640 
chunk_server_remove_csid(uint16_t csid)2641 static inline void chunk_server_remove_csid(uint16_t csid) {
2642 	// remove from used list
2643 	if (cstab[csid].prev<MAXCSCOUNT) {
2644 		cstab[cstab[csid].prev].next = cstab[csid].next;
2645 	} else {
2646 		csusedhead = cstab[csid].next;
2647 	}
2648 	if (cstab[csid].next<MAXCSCOUNT) {
2649 		cstab[cstab[csid].next].prev = cstab[csid].prev;
2650 	}
2651 	// append to free list
2652 	cstab[csid].next = MAXCSCOUNT;
2653 	cstab[csid].prev = csfreetail;
2654 	cstab[csfreetail].next = csid;
2655 	csfreetail = csid;
2656 
2657 //	cstab[csid].next = csfreehead;
2658 //	cstab[csid].prev = MAXCSCOUNT;
2659 //	cstab[csfreehead].prev = csid;
2660 //	csfreehead = csid;
2661 }
2662 
chunk_server_new_csid(void)2663 static inline uint16_t chunk_server_new_csid(void) {
2664 	uint16_t csid;
2665 
2666 	// take first element from free list
2667 	csid = csfreehead;
2668 	csfreehead = cstab[csid].next;
2669 	cstab[csfreehead].prev = MAXCSCOUNT;
2670 	// add to used list
2671 	if (csusedhead<MAXCSCOUNT) {
2672 		cstab[csusedhead].prev = csid;
2673 	}
2674 	cstab[csid].next = csusedhead;
2675 //	cstab[csid].prev = MAXCSCOUNT; // not necessary - it was first element in free list
2676 	csusedhead = csid;
2677 	return csid;
2678 }
2679 
chunk_server_check_delays(void)2680 static inline void chunk_server_check_delays(void) {
2681 	uint16_t csid;
2682 
2683 	csreceivingchunks = 0;
2684 	for (csid = csusedhead ; csid < MAXCSCOUNT ; csid = cstab[csid].next) {
2685 		if (cstab[csid].newchunkdelay>0) {
2686 			cstab[csid].newchunkdelay--;
2687 			csreceivingchunks |= 2;
2688 		}
2689 		if (cstab[csid].lostchunkdelay>0) {
2690 			cstab[csid].lostchunkdelay--;
2691 			csreceivingchunks |= 1;
2692 		}
2693 	}
2694 }
2695 
chunk_server_connected(void * ptr)2696 uint16_t chunk_server_connected(void *ptr) {
2697 	uint16_t csid;
2698 
2699 	csid = chunk_server_new_csid();
2700 	cstab[csid].ptr = ptr;
2701 	cstab[csid].opchunks = NULL;
2702 	cstab[csid].valid = 1;
2703 	cstab[csid].registered = 0;
2704 	cstab[csid].mfr_state = UNKNOWN_HARD;
2705 	csregisterinprogress += 1;
2706 	return csid;
2707 }
2708 
chunk_server_register_end(uint16_t csid)2709 void chunk_server_register_end(uint16_t csid) {
2710 	if (cstab[csid].registered==0 && cstab[csid].valid==1) {
2711 		cstab[csid].registered = 1;
2712 		csregisterinprogress -= 1;
2713 	}
2714 	if (csregisterinprogress==0) {
2715 		matoclserv_fuse_invalidate_chunk_cache();
2716 	}
2717 }
2718 
chunk_server_disconnected(uint16_t csid)2719 void chunk_server_disconnected(uint16_t csid) {
2720 	discserv *ds;
2721 	csopchunk *csop;
2722 	chunk *c;
2723 
2724 	ds = malloc(sizeof(discserv));
2725 	ds->csid = csid;
2726 	ds->next = discservers_next;
2727 	discservers_next = ds;
2728 	fs_cs_disconnected();
2729 	cstab[csid].valid = 0;
2730 	if (cstab[csid].registered==0) {
2731 		csregisterinprogress -= 1;
2732 	}
2733 	csop = cstab[csid].opchunks;
2734 	while (csop) {
2735 		c = chunk_find(csop->chunkid);
2736 		if (c) {
2737 			chunk_remove_disconnected_chunks(c);
2738 		}
2739 		csop = csop->next;
2740 		if (opsinprogress>0) {
2741 			opsinprogress--;
2742 		}
2743 	}
2744 	cstab[csid].opchunks = NULL;
2745 
2746 	for (csid = csusedhead ; csid < MAXCSCOUNT ; csid = cstab[csid].next) {
2747 		cstab[csid].mfr_state = UNKNOWN_HARD;
2748 	}
2749 	matoclserv_fuse_invalidate_chunk_cache();
2750 }
2751 
chunk_server_disconnection_loop(void)2752 void chunk_server_disconnection_loop(void) {
2753 	uint32_t i;
2754 	chunk *c,*cn;
2755 	discserv *ds;
2756 	uint64_t startutime,currutime;
2757 	static uint32_t discserverspos = 0;
2758 
2759 	if (discservers) {
2760 		startutime = monotonic_useconds();
2761 		currutime = startutime;
2762 		while (startutime+10000>currutime) {
2763 			for (i=0 ; i<100 ; i++) {
2764 				if (discserverspos<chunkrehashpos) {
2765 					for (c=chunkhashtab[discserverspos>>HASHTAB_LOBITS][discserverspos&HASHTAB_MASK] ; c ; c=cn ) {
2766 						cn = c->next;
2767 						chunk_remove_disconnected_chunks(c);
2768 					}
2769 					discserverspos++;
2770 				} else {
2771 					while (discservers) {
2772 						ds = discservers;
2773 						discservers = ds->next;
2774 						chunk_server_remove_csid(ds->csid);
2775 						matocsserv_disconnection_finished(cstab[ds->csid].ptr);
2776 						free(ds);
2777 					}
2778 					return;
2779 				}
2780 			}
2781 			currutime = monotonic_useconds();
2782 		}
2783 	} else if (discservers_next) {
2784 		discservers = discservers_next;
2785 		discservers_next = NULL;
2786 		discserverspos = 0;
2787 	}
2788 }
2789 
chunk_got_delete_status(uint16_t csid,uint64_t chunkid,uint8_t status)2790 void chunk_got_delete_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
2791 	chunk *c;
2792 	slist *s,**st;
2793 
2794 	if (status==MFS_STATUS_OK || status==MFS_ERROR_NOCHUNK) {
2795 		stats_chunkops[CHUNK_OP_DELETE_OK]++;
2796 	} else {
2797 		stats_chunkops[CHUNK_OP_DELETE_ERR]++;
2798 	}
2799 
2800 	c = chunk_find(chunkid);
2801 	if (c==NULL) {
2802 		return ;
2803 	}
2804 	if (status!=MFS_STATUS_OK && status!=MFS_ERROR_NOCHUNK) { // treat here MFS_ERROR_NOCHUNK as ok
2805 		return ;
2806 	}
2807 	st = &(c->slisthead);
2808 	while (*st) {
2809 		s = *st;
2810 		if (s->csid == csid) {
2811 			if (s->valid!=DEL) {
2812 				if (s->valid==TDBUSY || s->valid==TDVALID) {
2813 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
2814 					c->allvalidcopies--;
2815 				}
2816 				if (s->valid==BUSY || s->valid==VALID) {
2817 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2818 					c->allvalidcopies--;
2819 					c->regularvalidcopies--;
2820 				}
2821 				if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
2822 					matocsserv_write_counters(cstab[s->csid].ptr,0);
2823 				}
2824 				syslog(LOG_WARNING,"got unexpected delete status");
2825 			}
2826 			*st = s->next;
2827 			slist_free(s);
2828 		} else {
2829 			st = &(s->next);
2830 		}
2831 	}
2832 	if (c->lockedto<(uint32_t)main_time() && c->slisthead==NULL && c->fcount==0 && c->ondangerlist==0 && chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
2833 		changelog("%"PRIu32"|CHUNKDEL(%"PRIu64",%"PRIu32")",main_time(),c->chunkid,c->version);
2834 		chunk_delete(c);
2835 	}
2836 }
2837 
chunk_got_replicate_status(uint16_t csid,uint64_t chunkid,uint32_t version,uint8_t status)2838 void chunk_got_replicate_status(uint16_t csid,uint64_t chunkid,uint32_t version,uint8_t status) {
2839 	chunk *c;
2840 	slist *s,**st;
2841 	uint8_t fix;
2842 
2843 	if (status==MFS_STATUS_OK) {
2844 		stats_chunkops[CHUNK_OP_REPLICATE_OK]++;
2845 	} else {
2846 		stats_chunkops[CHUNK_OP_REPLICATE_ERR]++;
2847 	}
2848 
2849 	c = chunk_find(chunkid);
2850 	if (c==NULL) {
2851 		return ;
2852 	}
2853 	if (c->operation==REPLICATE) { // high priority replication
2854 		fix = 0;
2855 		if (status!=0) { // chunk hasn't been replicated (error occurred) - simply remove it from copies
2856 			st = &(c->slisthead);
2857 			while (*st) {
2858 				s = *st;
2859 				if (s->csid==csid && s->valid==BUSY) {
2860 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2861 					c->allvalidcopies--;
2862 					c->regularvalidcopies--;
2863 					if (c->writeinprogress) {
2864 						matocsserv_write_counters(cstab[s->csid].ptr,0);
2865 					}
2866 					fix = 1;
2867 					*st = s->next;
2868 					chunk_delopchunk(s->csid,c->chunkid);
2869 					slist_free(s);
2870 				} else {
2871 					st = &(s->next);
2872 				}
2873 			}
2874 		}
2875 		if (fix==0) {
2876 			for (s=c->slisthead ; s ; s=s->next) {
2877 				if (s->csid == csid) {
2878 					if (s->valid!=BUSY) {
2879 						syslog(LOG_WARNING,"got replication status from server not set as busy !!!");
2880 					}
2881 					if (status!=0 || version!=c->version) {
2882 						if (s->valid==TDBUSY || s->valid==TDVALID) {
2883 							chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
2884 							c->allvalidcopies--;
2885 						}
2886 						if (s->valid==BUSY || s->valid==VALID) {
2887 							chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2888 							c->allvalidcopies--;
2889 							c->regularvalidcopies--;
2890 						}
2891 						if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
2892 							matocsserv_write_counters(cstab[s->csid].ptr,0);
2893 						}
2894 						s->valid = INVALID;
2895 						s->version = 0;	// after unfinished operation can't be sure what version chunk has
2896 					} else {
2897 						if (s->valid == BUSY || s->valid == VALID) {
2898 							s->valid = VALID;
2899 						}
2900 					}
2901 					chunk_delopchunk(s->csid,c->chunkid);
2902 				} else if (s->valid==BUSY) {
2903 					syslog(LOG_WARNING,"got replication status from one server, but another is set as busy !!!");
2904 				}
2905 			}
2906 		}
2907 		c->operation = NONE;
2908 		c->lockedto = 0;
2909 		matoclserv_chunk_unlocked(c->chunkid,c);
2910 	} else { // low priority replication
2911 		if (status!=0) {
2912 			chunk_priority_queue_check(c,1);
2913 			return ;
2914 		}
2915 		for (s=c->slisthead ; s ; s=s->next) {
2916 			if (s->csid == csid) {
2917 				syslog(LOG_WARNING,"got replication status from server which had had that chunk before (chunk:%016"PRIX64"_%08"PRIX32")",chunkid,version);
2918 				if (s->valid==VALID && version!=c->version) {
2919 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
2920 					c->allvalidcopies--;
2921 					c->regularvalidcopies--;
2922 					s->valid = INVALID;
2923 					s->version = version;
2924 					if (c->writeinprogress) {
2925 						matocsserv_write_counters(cstab[s->csid].ptr,0);
2926 					}
2927 				}
2928 				chunk_priority_queue_check(c,1);
2929 				return;
2930 			}
2931 		}
2932 		s = slist_malloc();
2933 		s->csid = csid;
2934 		if (c->lockedto>=(uint32_t)main_time() || version!=c->version) {
2935 			s->valid = INVALID;
2936 		} else {
2937 			chunk_write_counters(c,0);
2938 			chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
2939 			c->allvalidcopies++;
2940 			c->regularvalidcopies++;
2941 			s->valid = VALID;
2942 		}
2943 		s->version = version;
2944 		s->next = c->slisthead;
2945 		c->slisthead = s;
2946 	}
2947 	chunk_priority_queue_check(c,1);
2948 }
2949 
2950 
chunk_operation_status(uint64_t chunkid,uint8_t status,uint16_t csid,uint8_t operation)2951 void chunk_operation_status(uint64_t chunkid,uint8_t status,uint16_t csid,uint8_t operation) {
2952 	chunk *c;
2953 	uint8_t opfinished,validcopies;
2954 	uint8_t verfixed;
2955 	slist *s,**st;
2956 
2957 	if (status==MFS_STATUS_OK) {
2958 		if (operation==CREATE) {
2959 			stats_chunkops[CHUNK_OP_CREATE_OK]++;
2960 		} else {
2961 			stats_chunkops[CHUNK_OP_CHANGE_OK]++;
2962 		}
2963 	} else {
2964 		if (operation==CREATE) {
2965 			stats_chunkops[CHUNK_OP_CREATE_ERR]++;
2966 		} else {
2967 			stats_chunkops[CHUNK_OP_CHANGE_ERR]++;
2968 		}
2969 	}
2970 
2971 	c = chunk_find(chunkid);
2972 	if (c==NULL) {
2973 		return ;
2974 	}
2975 
2976 	if (chunk_remove_disconnected_chunks(c)) {
2977 		return;
2978 	}
2979 //	for (s=c->slisthead ; s ; s=s->next) {
2980 //		if (!cstab[s->csid].valid) {
2981 //			c->interrupted = 1;
2982 //		}
2983 //	}
2984 
2985 	if (c->operation!=operation && operation!=NONE) {
2986 		uint8_t eop,sop;
2987 		eop = c->operation;
2988 		sop = operation;
2989 		if (eop>7) {
2990 			eop=7;
2991 		}
2992 		if (sop>7) {
2993 			sop=7;
2994 		}
2995 		syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32" - got unexpected status (expected: %s ; got: %s)",chunkid,c->version,opstr[eop],opstr[sop]);
2996 	}
2997 
2998 	validcopies = 0;
2999 	opfinished = 1;
3000 
3001 	st = &(c->slisthead);
3002 	while ((s=*st)!=NULL) {
3003 		if (s->csid == csid) {
3004 			if (status!=0) {
3005 				c->interrupted = 1;	// increase version after finish, just in case
3006 				if (s->valid==TDBUSY || s->valid==TDVALID) {
3007 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3008 					c->allvalidcopies--;
3009 				}
3010 				if (s->valid==BUSY || s->valid==VALID) {
3011 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3012 					c->allvalidcopies--;
3013 					c->regularvalidcopies--;
3014 				}
3015 				if (c->writeinprogress && s->valid!=INVALID && s->valid!=DEL && s->valid!=WVER && s->valid!=TDWVER) {
3016 					 matocsserv_write_counters(cstab[s->csid].ptr,0);
3017 				}
3018 				if (status==MFS_ERROR_NOTDONE) { // special case - this operation was not even started, so we know exact result
3019 					if (c->operation==SET_VERSION || c->operation==TRUNCATE) { // chunk left not changed, but now it has wrong version
3020 						if (s->valid==TDBUSY || s->valid==TDVALID) {
3021 							s->valid = TDWVER;
3022 						} else {
3023 							s->valid = WVER;
3024 						}
3025 						s->version--;
3026 					} else if (c->operation==CREATE || c->operation==DUPLICATE || c->operation==DUPTRUNC) { // copy not created
3027 						*st = s->next;
3028 						slist_free(s);
3029 						continue;
3030 					}
3031 				} else {
3032 					s->valid = INVALID;
3033 					s->version = 0;	// after unfinished operation can't be sure what version chunk has
3034 				}
3035 			} else {
3036 				if (s->valid==TDBUSY || s->valid==TDVALID) {
3037 					s->valid=TDVALID;
3038 				} else {
3039 					s->valid=VALID;
3040 				}
3041 			}
3042 			chunk_statusopchunk(s->csid,c->chunkid,status);
3043 //			chunk_delopchunk(s->csid,c->chunkid);
3044 		}
3045 		if (s->valid==BUSY || s->valid==TDBUSY) {
3046 			opfinished=0;
3047 		}
3048 		if (s->valid==VALID || s->valid==TDVALID) {
3049 			validcopies=1;
3050 		}
3051 		st = &(s->next);
3052 	}
3053 	if (opfinished && validcopies==0 && (c->operation==SET_VERSION || c->operation==TRUNCATE)) { // we know that version increase was just not completed, so all WVER chunks with version exactly one lower than chunk version are actually VALID copies
3054 		verfixed = 0;
3055 		for (s=c->slisthead ; s!=NULL ; s=s->next) {
3056 			if (s->version+1==c->version) {
3057 				if (s->valid==TDWVER) {
3058 					verfixed = 1;
3059 					s->valid = TDVALID;
3060 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies);
3061 					c->allvalidcopies++;
3062 				} else if (s->valid==WVER) {
3063 					verfixed = 1;
3064 					s->valid = VALID;
3065 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
3066 					c->allvalidcopies++;
3067 					c->regularvalidcopies++;
3068 				}
3069 			}
3070 			if (verfixed) {
3071 				c->version--;
3072 				changelog("%"PRIu32"|SETVERSION(%"PRIu64",%"PRIu32")",(uint32_t)main_time(),c->chunkid,c->version);
3073 			}
3074 		}
3075 		// we continue because we still want to return status not done to matoclserv module
3076 	}
3077 	if (opfinished) {
3078 		uint8_t nospace;
3079 		nospace = 1;
3080 		for (s=c->slisthead ; s ; s=s->next) {
3081 			status = chunk_delopchunk(s->csid,chunkid);
3082 			if (status!=MFS_ERROR_MISMATCH && status!=MFS_ERROR_NOSPACE) {
3083 				nospace = 0;
3084 			}
3085 		}
3086 		if (validcopies) {
3087 //			syslog(LOG_NOTICE,"operation finished, chunk: %016"PRIX64" ; op: %s ; interrupted: %u",c->chunkid,opstr[c->operation],c->interrupted);
3088 			if (c->interrupted) {
3089 				chunk_emergency_increase_version(c);
3090 			} else {
3091 				c->operation = NONE;
3092 				c->needverincrease = 0;
3093 				matoclserv_chunk_status(c->chunkid,MFS_STATUS_OK);
3094 				if (c->lockedto==0) {
3095 					matoclserv_chunk_unlocked(c->chunkid,c);
3096 				}
3097 			}
3098 		} else {
3099 			if (nospace) {
3100 				matoclserv_chunk_status(c->chunkid,MFS_ERROR_NOSPACE);
3101 			} else {
3102 				matoclserv_chunk_status(c->chunkid,MFS_ERROR_NOTDONE);
3103 			}
3104 			c->operation = NONE;
3105 		}
3106 	}
3107 }
3108 
chunk_got_chunkop_status(uint16_t csid,uint64_t chunkid,uint8_t status)3109 void chunk_got_chunkop_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3110 	chunk_operation_status(chunkid,status,csid,NONE);
3111 }
3112 
chunk_got_create_status(uint16_t csid,uint64_t chunkid,uint8_t status)3113 void chunk_got_create_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3114 	chunk_operation_status(chunkid,status,csid,CREATE);
3115 }
3116 
chunk_got_duplicate_status(uint16_t csid,uint64_t chunkid,uint8_t status)3117 void chunk_got_duplicate_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3118 	chunk_operation_status(chunkid,status,csid,DUPLICATE);
3119 }
3120 
chunk_got_setversion_status(uint16_t csid,uint64_t chunkid,uint8_t status)3121 void chunk_got_setversion_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3122 	chunk_operation_status(chunkid,status,csid,SET_VERSION);
3123 }
3124 
chunk_got_truncate_status(uint16_t csid,uint64_t chunkid,uint8_t status)3125 void chunk_got_truncate_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3126 	chunk_operation_status(chunkid,status,csid,TRUNCATE);
3127 }
3128 
chunk_got_duptrunc_status(uint16_t csid,uint64_t chunkid,uint8_t status)3129 void chunk_got_duptrunc_status(uint16_t csid,uint64_t chunkid,uint8_t status) {
3130 	chunk_operation_status(chunkid,status,csid,DUPTRUNC);
3131 }
3132 
chunk_no_more_pending_jobs(void)3133 uint8_t chunk_no_more_pending_jobs(void) {
3134 	return (opsinprogress==0)?1:0;
3135 }
3136 
3137 /* ----------------------- */
3138 /* JOBS (DELETE/REPLICATE) */
3139 /* ----------------------- */
3140 
chunk_store_info(uint8_t * buff)3141 void chunk_store_info(uint8_t *buff) {
3142 	put32bit(&buff,chunksinfo_loopstart);
3143 	put32bit(&buff,chunksinfo_loopend);
3144 	put32bit(&buff,chunksinfo.done.del_invalid);
3145 	put32bit(&buff,chunksinfo.notdone.del_invalid);
3146 	put32bit(&buff,chunksinfo.done.del_unused);
3147 	put32bit(&buff,chunksinfo.notdone.del_unused);
3148 	put32bit(&buff,chunksinfo.done.del_diskclean);
3149 	put32bit(&buff,chunksinfo.notdone.del_diskclean);
3150 	put32bit(&buff,chunksinfo.done.del_overgoal);
3151 	put32bit(&buff,chunksinfo.notdone.del_overgoal);
3152 	put32bit(&buff,chunksinfo.done.copy_undergoal);
3153 	put32bit(&buff,chunksinfo.notdone.copy_undergoal);
3154 	put32bit(&buff,chunksinfo.done.copy_wronglabels);
3155 	put32bit(&buff,chunksinfo.notdone.copy_wronglabels);
3156 	put32bit(&buff,chunksinfo.copy_rebalance);
3157 	put32bit(&buff,chunksinfo.labels_dont_match);
3158 	put32bit(&buff,chunksinfo.locked_unused);
3159 	put32bit(&buff,chunksinfo.locked_used);
3160 }
3161 
chunk_mindist(uint16_t csid,uint32_t ip[255],uint8_t ipcnt)3162 static inline uint8_t chunk_mindist(uint16_t csid,uint32_t ip[255],uint8_t ipcnt) {
3163 	uint8_t mindist,dist,k;
3164 	uint32_t sip;
3165 	mindist = TOPOLOGY_DIST_MAX;
3166 	if (matocsserv_get_csdata(cstab[csid].ptr,&sip,NULL,NULL,NULL)==0) {
3167 		for (k=0 ; k<ipcnt && mindist>0 ; k++) {
3168 			dist=topology_distance(sip,ip[k]);
3169 			if (dist<mindist) {
3170 				mindist = dist;
3171 			}
3172 		}
3173 	}
3174 	return mindist;
3175 }
3176 
3177 // first servers in the same rack (server id), then other (yes, same rack is better than same physical server, so order is 1 and then 0 or 2)
chunk_rack_sort(uint16_t servers[MAXCSCOUNT],uint16_t servcount,uint32_t ip[255],uint8_t ipcnt)3178 static inline void chunk_rack_sort(uint16_t servers[MAXCSCOUNT],uint16_t servcount,uint32_t ip[255],uint8_t ipcnt) {
3179 	uint16_t i,j;
3180 	uint16_t csid;
3181 	uint8_t mindist;
3182 
3183 	if (servcount==0 || ipcnt==0) {
3184 		return;
3185 	}
3186 	i = 0;
3187 	j = servcount-1;
3188 	while (i<j) {
3189 		while (i<j) {
3190 			mindist = chunk_mindist(servers[i],ip,ipcnt);
3191 			if (mindist!=TOPOLOGY_DIST_SAME_RACKID) {
3192 				break;
3193 			}
3194 			i++;
3195 		}
3196 		while (i<j) {
3197 			mindist = chunk_mindist(servers[j],ip,ipcnt);
3198 			if (mindist==TOPOLOGY_DIST_SAME_RACKID) {
3199 				break;
3200 			}
3201 			j--;
3202 		}
3203 		if (i<j) {
3204 			csid = servers[i];
3205 			servers[i] = servers[j];
3206 			servers[j] = csid;
3207 		}
3208 	}
3209 }
3210 
chunk_get_undergoal_replicate_srccsid(chunk * c,uint16_t dstcsid,uint32_t now,uint32_t lclass,uint32_t rgvc,uint32_t rgtdc)3211 static inline uint16_t chunk_get_undergoal_replicate_srccsid(chunk *c,uint16_t dstcsid,uint32_t now,uint32_t lclass,uint32_t rgvc,uint32_t rgtdc) {
3212 	slist *s;
3213 	uint32_t r = 0;
3214 	uint16_t srccsid = MAXCSCOUNT;
3215 
3216 	if (ReplicationsRespectTopology) {
3217 		uint32_t min_dist = 0xFFFFFFFF;
3218 		uint32_t dist;
3219 		uint32_t ip;
3220 		uint32_t cuip;
3221 		uint32_t mdrgvc = 0;
3222 		uint32_t mdrgtdc = 0;
3223 
3224 		if (matocsserv_get_csdata(cstab[dstcsid].ptr,&cuip,NULL,NULL,NULL)==0) {
3225 			for (s=c->slisthead ; s ; s=s->next) {
3226 				if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass] && (s->valid==VALID || s->valid==TDVALID)) {
3227 					if (matocsserv_get_csdata(cstab[s->csid].ptr,&ip,NULL,NULL,NULL)==0) {
3228 						dist=topology_distance(ip,cuip);
3229 						if (min_dist>=dist) {
3230 							if (min_dist>dist) {
3231 								min_dist=dist;
3232 								srccsid=s->csid;
3233 								mdrgvc = 0;
3234 								mdrgtdc = 0;
3235 							} else if (s->valid==VALID) {
3236 								srccsid=s->csid;
3237 							}
3238 							if (s->valid==VALID) {
3239 								mdrgvc++;
3240 							} else {
3241 								mdrgtdc++;
3242 							}
3243 						}
3244 					}
3245 				}
3246 			}
3247 			if (mdrgvc > 1) {
3248 				r = 1+rndu32_ranged(mdrgvc);
3249 			} else if (mdrgvc == 0 && mdrgtdc > 1) {  // we have to choose TDVALID chunks
3250 				r = 1+rndu32_ranged(mdrgtdc);
3251 			}
3252 			for (s=c->slisthead ; s && r>0 ; s=s->next) {
3253 				if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass] && (s->valid==VALID || s->valid==TDVALID)) {
3254 					if (matocsserv_get_csdata(cstab[s->csid].ptr,&ip,NULL,NULL,NULL)==0) {
3255 						dist=topology_distance(ip,cuip);
3256 						if (min_dist==dist) {
3257 							if (mdrgvc > 1 && s->valid==VALID) {
3258 								r--;
3259 								srccsid=s->csid;
3260 							} else if (mdrgtdc > 1 && s->valid==TDVALID) {
3261 								r--;
3262 								srccsid=s->csid;
3263 							}
3264 						}
3265 					}
3266 				}
3267 			}
3268 		}
3269 	} else {
3270 		if (rgvc>0) {	// if there are VALID copies then make copy of one VALID chunk
3271 			r = 1+rndu32_ranged(rgvc);
3272 			for (s=c->slisthead ; s && r>0 ; s=s->next) {
3273 				if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass] && s->valid==VALID) {
3274 					r--;
3275 					srccsid = s->csid;
3276 				}
3277 			}
3278 		} else {	// if not then use TDVALID chunks.
3279 			r = 1+rndu32_ranged(rgtdc);
3280 			for (s=c->slisthead ; s && r>0 ; s=s->next) {
3281 				if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass] && s->valid==TDVALID) {
3282 					r--;
3283 					srccsid = s->csid;
3284 				}
3285 			}
3286 		}
3287 	}
3288 
3289 	return srccsid;
3290 }
3291 
chunk_undergoal_replicate(chunk * c,uint16_t dstcsid,uint32_t now,uint32_t lclass,uint16_t priority,loop_info * inforec,uint32_t rgvc,uint32_t rgtdc)3292 static inline int chunk_undergoal_replicate(chunk *c,uint16_t dstcsid,uint32_t now,uint32_t lclass,uint16_t priority,loop_info *inforec,uint32_t rgvc,uint32_t rgtdc) {
3293 	slist *s;
3294 	uint16_t srccsid;
3295 
3296 	srccsid = chunk_get_undergoal_replicate_srccsid(c,dstcsid,now,lclass,rgvc,rgtdc);
3297 	if (srccsid==MAXCSCOUNT) {
3298 		return -1;
3299 	}
3300 	stats_chunkops[CHUNK_OP_REPLICATE_TRY]++;
3301 	// high priority replication
3302 	if (matocsserv_send_replicatechunk(cstab[dstcsid].ptr,c->chunkid,c->version,cstab[srccsid].ptr)<0) {
3303 		syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": error sending replicate command",c->chunkid,c->version);
3304 		return 1;
3305 	}
3306 	chunk_addopchunk(dstcsid,c->chunkid);
3307 	c->operation = REPLICATE;
3308 	c->lockedto = now+LOCKTIMEOUT;
3309 	s = slist_malloc();
3310 	s->csid = dstcsid;
3311 	s->valid = BUSY;
3312 	s->version = c->version;
3313 	s->next = c->slisthead;
3314 	c->slisthead = s;
3315 	chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies+1,c->regularvalidcopies,c->regularvalidcopies+1);
3316 	c->allvalidcopies++;
3317 	c->regularvalidcopies++;
3318 	if (priority<5) {
3319 		inforec->done.copy_undergoal++;
3320 	} else {
3321 		inforec->done.copy_wronglabels++;
3322 	}
3323 	return 0;
3324 }
3325 
chunk_do_jobs(chunk * c,uint16_t scount,uint16_t fullservers,uint32_t now,uint8_t extrajob)3326 void chunk_do_jobs(chunk *c,uint16_t scount,uint16_t fullservers,uint32_t now,uint8_t extrajob) {
3327 	slist *s;
3328 	static uint16_t *dcsids = NULL;
3329 	static uint16_t dservcount;
3330 //	static uint16_t *bcsids;
3331 //	static uint16_t bservcount;
3332 	static uint16_t *rcsids = NULL;
3333 	uint16_t servmaxpos[4];
3334 	uint16_t rservcount;
3335 	uint16_t srccsid,dstcsid;
3336 	double repl_read_counter;
3337 	uint16_t i,j,k;
3338 	uint16_t prilevel;
3339 	uint32_t vc,tdc,ivc,bc,tdb,dc,wvc,tdw;
3340 	uint32_t goal;
3341 	static loop_info inforec;
3342 	static uint32_t delnotdone;
3343 	static uint32_t deldone;
3344 	static uint32_t prevtodeletecount;
3345 	static uint32_t delloopcnt;
3346 	uint32_t **labelmasks;
3347 	uint32_t labelcnt;
3348 	static uint16_t *servers = NULL;
3349 	uint32_t servcnt,extraservcnt;
3350 	int32_t *matching;
3351 	uint32_t forcereplication;
3352 	uint32_t vrip[255];
3353 	uint8_t vripcnt;
3354 #ifdef MFSDEBUG
3355 	uint8_t debug;
3356 #endif
3357 
3358 	if (servers==NULL) {
3359 		servers = malloc(sizeof(uint16_t)*MAXCSCOUNT);
3360 		passert(servers);
3361 	}
3362 	if (rcsids==NULL) {
3363 		rcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
3364 		passert(rcsids);
3365 	}
3366 	if (dcsids==NULL) {
3367 		dcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
3368 		passert(dcsids);
3369 	}
3370 //	if (bcsids==NULL) {
3371 //		bcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
3372 //		passert(bcsids);
3373 //	}
3374 
3375 	if (c==NULL) {
3376 		if (scount==JOBS_INIT) { // init tasks
3377 			delnotdone = 0;
3378 			deldone = 0;
3379 			prevtodeletecount = 0;
3380 			delloopcnt = 0;
3381 			memset(&inforec,0,sizeof(loop_info));
3382 			dservcount = 0;
3383 		} else if (scount==JOBS_EVERYLOOP) { // every loop tasks
3384 			delloopcnt++;
3385 			if (delloopcnt>=16) {
3386 				uint32_t todeletecount = deldone+delnotdone;
3387 				delloopcnt=0;
3388 				if ((delnotdone > deldone) && (todeletecount > prevtodeletecount)) {
3389 					TmpMaxDelFrac *= 1.5;
3390 					if (TmpMaxDelFrac>MaxDelHardLimit) {
3391 						syslog(LOG_NOTICE,"DEL_LIMIT hard limit (%"PRIu32" per server) reached",MaxDelHardLimit);
3392 						TmpMaxDelFrac=MaxDelHardLimit;
3393 					}
3394 					TmpMaxDel = TmpMaxDelFrac;
3395 					syslog(LOG_NOTICE,"DEL_LIMIT temporary increased to: %"PRIu32" per server",TmpMaxDel);
3396 				}
3397 				if ((todeletecount < prevtodeletecount) && (TmpMaxDelFrac > MaxDelSoftLimit)) {
3398 					TmpMaxDelFrac /= 1.5;
3399 					if (TmpMaxDelFrac<MaxDelSoftLimit) {
3400 						syslog(LOG_NOTICE,"DEL_LIMIT back to soft limit (%"PRIu32" per server)",MaxDelSoftLimit);
3401 						TmpMaxDelFrac = MaxDelSoftLimit;
3402 					}
3403 					TmpMaxDel = TmpMaxDelFrac;
3404 					syslog(LOG_NOTICE,"DEL_LIMIT decreased back to: %"PRIu32" per server",TmpMaxDel);
3405 				}
3406 				prevtodeletecount = todeletecount;
3407 				delnotdone = 0;
3408 				deldone = 0;
3409 			}
3410 			chunksinfo = inforec;
3411 			memset(&inforec,0,sizeof(inforec));
3412 			chunksinfo_loopstart = chunksinfo_loopend;
3413 			chunksinfo_loopend = now;
3414 		} else if (scount==JOBS_EVERYTICK) { // every second tasks
3415 			dservcount = 0;
3416 //			bservcount=0;
3417 		} else if (scount==JOBS_TERM) {
3418 			if (servers!=NULL) {
3419 				free(servers);
3420 			}
3421 			if (rcsids!=NULL) {
3422 				free(rcsids);
3423 			}
3424 			if (dcsids!=NULL) {
3425 				free(dcsids);
3426 			}
3427 		}
3428 		return;
3429 	}
3430 
3431 // step 0. remove all disconnected copies from structures
3432 	if (chunk_remove_disconnected_chunks(c)) {
3433 		return;
3434 	}
3435 
3436 	if (c->lockedto < now) {
3437 		chunk_write_counters(c,0);
3438 	}
3439 
3440 // step 1. calculate number of valid and invalid copies
3441 	vc = 0;
3442 	tdc = 0;
3443 	ivc = 0;
3444 	bc = 0;
3445 	tdb = 0;
3446 	dc = 0;
3447 	wvc = 0;
3448 	tdw = 0;
3449 	for (s=c->slisthead ; s ; s=s->next) {
3450 		switch (s->valid) {
3451 		case INVALID:
3452 			ivc++;
3453 			break;
3454 		case TDVALID:
3455 			tdc++;
3456 			break;
3457 		case VALID:
3458 			vc++;
3459 			break;
3460 		case TDBUSY:
3461 			tdb++;
3462 			break;
3463 		case BUSY:
3464 			bc++;
3465 			break;
3466 		case DEL:
3467 			dc++;
3468 			break;
3469 		case WVER:
3470 			wvc++;
3471 			break;
3472 		case TDWVER:
3473 			tdw++;
3474 			break;
3475 		}
3476 	}
3477 #ifdef MFSDEBUG
3478 	debug = ((c->chunkid&0xFFF)==0)?1:0;
3479 	if (debug) {
3480 		mfsdebug("chunk_do_jobs ; chunkid=%016"PRIX64" ; vc=%u ; tdc=%u ; bc=%u ; tdb=%u ; wvc=%u ; tdw=%u ; ivc=%u ; dc=%u",c->chunkid,vc,tdc,bc,tdb,wvc,tdw,ivc,dc);
3481 		for (s=c->slisthead ; s ; s=s->next) {
3482 			mfsdebug("chunk_do_jobs ; chunkid=%016"PRIX64" ; existing copy: valid=%s ; csid=%"PRIu16" ; ip=%s",c->chunkid,validstr[s->valid],s->csid,matocsserv_getstrip(cstab[s->csid].ptr));
3483 		}
3484 	}
3485 #endif
3486 	if (c->allvalidcopies!=vc+tdc+bc+tdb) {
3487 		syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": wrong all valid copies counter - (counter value: %u, should be: %u) - fixed",c->chunkid,c->version,c->allvalidcopies,vc+tdc+bc+tdb);
3488 		chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,vc+tdc+bc+tdb,c->regularvalidcopies,c->regularvalidcopies);
3489 		c->allvalidcopies = vc+tdc+bc+tdb;
3490 	}
3491 	if (c->regularvalidcopies!=vc+bc) {
3492 		syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": wrong regular valid copies counter - (counter value: %u, should be: %u) - fixed",c->chunkid,c->version,c->regularvalidcopies,vc+bc);
3493 		chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies,c->regularvalidcopies,vc+bc);
3494 		c->regularvalidcopies = vc+bc;
3495 	}
3496 	if (tdb+bc==0 && c->operation!=NONE) {
3497 		syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": chunk in middle of operation %s, but no chunk server is busy - finish operation",c->chunkid,c->version,opstr[c->operation]);
3498 		c->operation = NONE;
3499 	}
3500 	goal = sclass_get_keeparch_goal(c->sclassid,c->archflag);
3501 	if (vc + bc < goal && tdc + tdb > 0) {
3502 		for (s=c->slisthead ; s ; s=s->next) {
3503 			if (s->valid == TDVALID || s->valid == TDBUSY) {
3504 				cstab[s->csid].mfr_state = REPL_IN_PROGRESS;
3505 			}
3506 		}
3507 	}
3508 	if (c->lockedto < now) {
3509 		if (tdb+bc>0 && c->operation==NONE) {
3510 			if (tdc+vc>0) {
3511 				syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": unexpected BUSY copies - fixing",c->chunkid,c->version);
3512 				for (s=c->slisthead ; s ; s=s->next) {
3513 					if (s->valid == BUSY) {
3514 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3515 						c->allvalidcopies--;
3516 						c->regularvalidcopies--;
3517 						s->valid = INVALID;
3518 						s->version = 0;
3519 						ivc++;
3520 						bc--;
3521 					} else if (s->valid == TDBUSY) {
3522 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3523 						c->allvalidcopies--;
3524 						s->valid = INVALID;
3525 						s->version = 0;
3526 						ivc++;
3527 						tdb--;
3528 					}
3529 				}
3530 			} else {
3531 				syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": unexpected BUSY copies - can't fix",c->chunkid,c->version);
3532 			}
3533 		}
3534 
3535 /*
3536 #warning debug
3537 	{
3538 		static FILE *fd = NULL;
3539 		static uint32_t cnt = 0;
3540 		if (fd==NULL) {
3541 			fd = fopen("chunklog.txt","a");
3542 		}
3543 		if (fd!=NULL) {
3544 			fprintf(fd,"chunk %016"PRIX64": ivc=%"PRIu32" , dc=%"PRIu32" , vc=%"PRIu32" , bc=%"PRIu32" , wvc=%"PRIu32" , tdv=%"PRIu32" , tdb=%"PRIu32" , tdw=%"PRIu32" , goal=%"PRIu8" , scount=%"PRIu16"\n",c->chunkid,ivc,dc,vc,bc,wvc,tdc,tdb,tdw,c->goal,scount);
3545 			cnt++;
3546 		}
3547 		if (cnt>100000) {
3548 			fclose(fd);
3549 			fd = NULL;
3550 			cnt = 0;
3551 		}
3552 	}
3553 */
3554 //	syslog(LOG_WARNING,"chunk %016"PRIX64": ivc=%"PRIu32" , dc=%"PRIu32" , vc=%"PRIu32" , bc=%"PRIu32" , wvc=%"PRIu32" , tdv=%"PRIu32" , tdb=%"PRIu32" , tdw=%"PRIu32" , goal=%"PRIu8" , scount=%"PRIu16,c->chunkid,ivc,dc,vc,bc,wvc,tdc,tdb,tdw,c->goal,scount);
3555 
3556 // step 2. check number of copies
3557 		if (tdc+vc+tdb+bc==0 && wvc+tdw>0 && c->fcount>0/* c->flisthead */) {
3558 			if ((tdw+wvc)>=goal) {
3559 				uint32_t bestversion;
3560 				bestversion = 0;
3561 				for (s=c->slisthead ; s ; s=s->next) {
3562 					if (s->valid==WVER || s->valid==TDWVER) {
3563 						if (s->version>=bestversion) {
3564 							bestversion = s->version;
3565 						}
3566 					}
3567 				}
3568 				if (bestversion>0 && ((bestversion+1)==c->version || c->version+1==bestversion)) {
3569 					syslog(LOG_WARNING,"chunk %016"PRIX64" has only invalid copies (%"PRIu32") - fixing it",c->chunkid,wvc+tdw);
3570 					c->version = bestversion;
3571 					for (s=c->slisthead ; s ; s=s->next) {
3572 						if (s->version==bestversion && cstab[s->csid].valid) {
3573 							if (s->valid == WVER) {
3574 								s->valid = VALID;
3575 								c->allvalidcopies++;
3576 								c->regularvalidcopies++;
3577 							} else if (s->valid == TDWVER) {
3578 								s->valid = TDVALID;
3579 								c->allvalidcopies++;
3580 							}
3581 						}
3582 						if (s->valid == WVER || s->valid == TDWVER) {
3583 							syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32" - wrong versioned copy on (%s - ver:%08"PRIX32")",c->chunkid,c->version,matocsserv_getstrip(cstab[s->csid].ptr),s->version);
3584 						} else {
3585 							syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32" - valid copy on (%s - ver:%08"PRIX32")",c->chunkid,c->version,matocsserv_getstrip(cstab[s->csid].ptr),s->version);
3586 						}
3587 					}
3588 					chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,0,c->allvalidcopies,0,c->regularvalidcopies);
3589 					c->needverincrease = 1;
3590 					return;
3591 				}
3592 			}
3593 		}
3594 		if (tdc+vc+tdb+bc==0 && wvc+tdw+ivc>0 && c->fcount>0) {
3595 			if (wvc+tdw==0) {
3596 				syslog(LOG_WARNING,"chunk %016"PRIX64" has only invalid copies (%"PRIu32") - please repair it manually",c->chunkid,ivc);
3597 			} else {
3598 				syslog(LOG_WARNING,"chunk %016"PRIX64" has only copies with wrong versions (%"PRIu32") - please repair it manually",c->chunkid,wvc+tdw);
3599 			}
3600 			for (s=c->slisthead ; s ; s=s->next) {
3601 				if (s->valid==INVALID) {
3602 					syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32" - invalid copy on (%s - ver:%08"PRIX32")",c->chunkid,c->version,matocsserv_getstrip(cstab[s->csid].ptr),s->version);
3603 				} else {
3604 					syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32" - copy with wrong version on (%s - ver:%08"PRIX32")",c->chunkid,c->version,matocsserv_getstrip(cstab[s->csid].ptr),s->version);
3605 				}
3606 			}
3607 			return;
3608 		}
3609 
3610 		if (tdc+vc+tdb+bc+ivc+dc+wvc+tdw==0 && c->fcount>0) {
3611 			syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32": there are no copies",c->chunkid,c->version);
3612 			return;
3613 		}
3614 	}
3615 
3616 // step 3.0. delete invalid copies
3617 	if (extrajob==0 && (tdc+vc+tdb+bc>0 || (c->fcount==0 && c->lockedto<now))) {
3618 		for (s=c->slisthead ; s ; s=s->next) {
3619 			if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3620 				if (s->valid==WVER || s->valid==TDWVER || s->valid==INVALID || s->valid==DEL) {
3621 					if (s->valid==DEL) {
3622 						syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": chunk hasn't been deleted since previous loop - retry",c->chunkid,c->version);
3623 					}
3624 					switch (s->valid) {
3625 						case WVER:
3626 							wvc--;
3627 							break;
3628 						case TDWVER:
3629 							tdw--;
3630 							break;
3631 						case INVALID:
3632 							ivc--;
3633 							break;
3634 						case DEL:
3635 							dc--;
3636 							break;
3637 					}
3638 					s->valid = DEL;
3639 					dc++;
3640 					stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3641 					matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3642 					inforec.done.del_invalid++;
3643 					deldone++;
3644 				}
3645 			} else {
3646 				if (s->valid==WVER || s->valid==TDWVER || s->valid==INVALID) {
3647 					inforec.notdone.del_invalid++;
3648 					delnotdone++;
3649 				}
3650 			}
3651 		}
3652 	}
3653 
3654 // step 3.1. check for unfinished replications
3655 	if (extrajob == 0) {
3656 		if (c->operation==REPLICATE && c->lockedto<now) {
3657 			syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32": chunk hasn't been replicated since previous loop - retry",c->chunkid,c->version);
3658 			for (s=c->slisthead ; s ; s=s->next) {
3659 				if (s->valid==TDBUSY || s->valid==BUSY) {
3660 					if (s->valid==TDBUSY) {
3661 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3662 						c->allvalidcopies--;
3663 						tdb--;
3664 					}
3665 					if (s->valid==BUSY) {
3666 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3667 						c->allvalidcopies--;
3668 						c->regularvalidcopies--;
3669 						bc--;
3670 					}
3671 					s->valid = INVALID;
3672 					s->version = 0;	// after unfinished operation can't be sure what version chunk has
3673 					chunk_delopchunk(s->csid,c->chunkid);
3674 					ivc++;
3675 				}
3676 			}
3677 			c->operation = NONE;
3678 			c->lockedto = 0;
3679 			matoclserv_chunk_unlocked(c->chunkid,c);
3680 		}
3681 	}
3682 
3683 // step 4. return if chunk is during some operation
3684 	if (c->operation!=NONE || (c->lockedto>=now)) {
3685 		if (extrajob == 0) {
3686 			if (c->fcount==0) {
3687 				inforec.locked_unused++;
3688 			} else {
3689 				inforec.locked_used++;
3690 				if (goal > vc+bc && vc+tdc+bc+tdb > 0) {
3691 					if (c->operation!=NONE) {
3692 						syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32": can't replicate chunk - operation %s in progress",c->chunkid,c->version,opstr[c->operation]);
3693 					} else {
3694 						syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32": can't replicate chunk - chunk is being modified (locked for next %"PRIu32" second%s)",c->chunkid,c->version,(uint32_t)(1+c->lockedto-now),(c->lockedto==now)?"":"s");
3695 					}
3696 				}
3697 			}
3698 		}
3699 		return ;
3700 	}
3701 
3702 //	assert(c->lockedto < now && c->writeinprogress == 0)
3703 
3704 // step 5. check busy count
3705 	if ((bc+tdb)>0) {
3706 		syslog(LOG_WARNING,"chunk %016"PRIX64"_%08"PRIX32" has unexpected BUSY copies",c->chunkid,c->version);
3707 		return ;
3708 	}
3709 
3710 // step 6. delete unused chunk
3711 
3712 	if (extrajob==0 && c->fcount==0/* c->flisthead==NULL */) {
3713 //		syslog(LOG_WARNING,"unused - delete");
3714 		for (s=c->slisthead ; s ; s=s->next) {
3715 			if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3716 				if (s->valid==VALID || s->valid==TDVALID) {
3717 					if (s->valid==TDVALID) {
3718 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3719 						c->allvalidcopies--;
3720 					} else {
3721 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3722 						c->allvalidcopies--;
3723 						c->regularvalidcopies--;
3724 					}
3725 					c->needverincrease = 1;
3726 					s->valid = DEL;
3727 					stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3728 					matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,c->version);
3729 					inforec.done.del_unused++;
3730 					deldone++;
3731 				}
3732 			} else {
3733 				if (s->valid==VALID || s->valid==TDVALID) {
3734 					inforec.notdone.del_unused++;
3735 					delnotdone++;
3736 				}
3737 			}
3738 		}
3739 		return ;
3740 	}
3741 
3742 // step 7.0. if chunk has enough valid copies and more than one copy with wrong version then delete all copies with wrong version
3743 	if (extrajob==0 && vc >= goal && wvc>0) {
3744 		for (s=c->slisthead ; s ; s=s->next) {
3745 			if (s->valid == WVER) {
3746 				if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3747 					s->valid = DEL;
3748 					stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3749 					matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3750 					wvc--;
3751 					dc++;
3752 				}
3753 			}
3754 		}
3755 	}
3756 
3757 // step 7.1. if chunk has too many copies then delete some of them
3758 	if (vc > goal) {
3759 		uint8_t prevdone;
3760 		if (dservcount==0) {
3761 			// dservcount = matocsserv_getservers_ordered(dcsids,AcceptableDifference/2.0,NULL,NULL);
3762 			dservcount = matocsserv_getservers_ordered(dcsids);
3763 		}
3764 	//	syslog(LOG_WARNING,"vc (%"PRIu32") > goal (%"PRIu32") - delete",vc,sclass_getgoal(c->sclassid));
3765 		inforec.notdone.del_overgoal+=(vc-goal);
3766 		delnotdone+=(vc-goal);
3767 		prevdone = 1;
3768 
3769 		if ((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag)) { // labels version
3770 			servcnt = 0;
3771 			for (i=0 ; i<dservcount ; i++) {
3772 				for (s=c->slisthead ; s && s->csid!=dcsids[dservcount-1-i] ; s=s->next) {}
3773 				if (s && s->valid==VALID) {
3774 					servers[servcnt++] = s->csid;
3775 				}
3776 			}
3777 			labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
3778 			matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
3779 			for (i=0 ; i<servcnt && vc>goal && prevdone ; i++) {
3780 				if (matching[i+labelcnt]<0) {
3781 					for (s=c->slisthead ; s && s->csid!=servers[i] ; s=s->next) {}
3782 					if (s && s->valid==VALID) {
3783 						if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3784 							chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3785 							c->allvalidcopies--;
3786 							c->regularvalidcopies--;
3787 							c->needverincrease = 1;
3788 							s->valid = DEL;
3789 							stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3790 							matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3791 							inforec.done.del_overgoal++;
3792 							inforec.notdone.del_overgoal--;
3793 							deldone++;
3794 							delnotdone--;
3795 							vc--;
3796 							dc++;
3797 						} else {
3798 							prevdone=0;
3799 							chunk_priority_enqueue(DPRIORITY_OVERGOAL,c); // in such case only enqueue this chunk for future processing
3800 						}
3801 					}
3802 				}
3803 			}
3804 		} else { // classic goal version
3805 			for (i=0 ; i<dservcount && vc>goal && prevdone; i++) {
3806 				for (s=c->slisthead ; s && s->csid!=dcsids[dservcount-1-i] ; s=s->next) {}
3807 				if (s && s->valid==VALID) {
3808 					if (matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3809 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies-1);
3810 						c->allvalidcopies--;
3811 						c->regularvalidcopies--;
3812 						c->needverincrease = 1;
3813 						s->valid = DEL;
3814 						stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3815 						matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3816 						inforec.done.del_overgoal++;
3817 						inforec.notdone.del_overgoal--;
3818 						deldone++;
3819 						delnotdone--;
3820 						vc--;
3821 						dc++;
3822 					} else {
3823 						prevdone=0;
3824 						chunk_priority_enqueue(DPRIORITY_OVERGOAL,c); // in such case only enqueue this chunk for future processing
3825 					}
3826 				}
3827 			}
3828 		}
3829 		return;
3830 	}
3831 
3832 // step 7.2. if chunk has one copy on each server and some of them have status TDVALID then delete them
3833 	if (extrajob==0 && vc+tdc>=scount && vc<goal && tdc>0 && vc+tdc>1 && chunks_priority_leng[DPRIORITY_ENDANGERED_HIGHGOAL]==0 && chunks_priority_leng[DPRIORITY_ENDANGERED]==0 && chunks_priority_leng[DPRIORITY_UNDERGOAL_MFR]==0) {
3834 		uint8_t tdcr = 0;
3835 		for (s=c->slisthead ; s ; s=s->next) {
3836 			if (s->valid==TDVALID) {
3837 				if (matocsserv_has_avail_space(cstab[s->csid].ptr)) {
3838 					tdcr++;
3839 				}
3840 			}
3841 		}
3842 		if (vc+tdcr>=scount) {
3843 			uint8_t prevdone;
3844 	//		syslog(LOG_WARNING,"vc+tdc (%"PRIu32") >= scount (%"PRIu32") and vc (%"PRIu32") < goal (%"PRIu32") and tdc (%"PRIu32") > 0 and vc+tdc > 1 - delete",vc+tdc,scount,vc,goal,tdc);
3845 			prevdone = 0;
3846 			for (s=c->slisthead ; s && prevdone==0 ; s=s->next) {
3847 				if (s->valid==TDVALID) {
3848 					if (matocsserv_has_avail_space(cstab[s->csid].ptr) && matocsserv_deletion_counter(cstab[s->csid].ptr)<TmpMaxDel) {
3849 						chunk_state_change(c->sclassid,c->sclassid,c->archflag,c->archflag,c->allvalidcopies,c->allvalidcopies-1,c->regularvalidcopies,c->regularvalidcopies);
3850 						c->allvalidcopies--;
3851 						c->needverincrease = 1;
3852 						s->valid = DEL;
3853 						stats_chunkops[CHUNK_OP_DELETE_TRY]++;
3854 						matocsserv_send_deletechunk(cstab[s->csid].ptr,c->chunkid,0);
3855 						inforec.done.del_diskclean++;
3856 						tdc--;
3857 						dc++;
3858 						prevdone = 1;
3859 					} else {
3860 						inforec.notdone.del_diskclean++;
3861 					}
3862 				}
3863 			}
3864 			if (prevdone) {
3865 				return;
3866 			}
3867 		}
3868 	}
3869 
3870 // step 8. check matching for labeled chunks
3871 	forcereplication = 0;
3872 	if ((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag)) {
3873 		servcnt = 0;
3874 		for (s=c->slisthead ; s ; s=s->next) {
3875 			if (s->valid==VALID) {
3876 				servers[servcnt++] = s->csid;
3877 			}
3878 		}
3879 		labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
3880 		matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
3881 		for (i=0 ; i<labelcnt ; i++) {
3882 			if (matching[i]<0) { // there are unmatched labels
3883 				forcereplication++;
3884 			}
3885 		}
3886 	}
3887 
3888 // step 9. if chunk has number of copies less than goal then make another copy of this chunk
3889 	if ((forcereplication || goal > vc) && vc+tdc > 0) {
3890 		uint8_t canbefixed;
3891 		canbefixed = 1;
3892 		if (csdb_replicate_undergoals()) {
3893 //		if ((csdb_getdisconnecttime()+ReplicationsDelayDisconnect)<now) {
3894 			uint32_t rgvc,rgtdc;
3895 			uint32_t lclass;
3896 
3897 			if (vc+tdc==1 && goal>2) { // highest priority - chunks with only one copy and high goal
3898 				prilevel = DPRIORITY_ENDANGERED_HIGHGOAL;
3899 				lclass = 0;
3900 			} else if (vc+tdc==1 && goal==2) { // next priority - chunks with only one copy
3901 				prilevel = DPRIORITY_ENDANGERED;
3902 				lclass = 0;
3903 			} else if (vc==1 && tdc>0) { // next priority - chunks on one regular disk and some "marked for removal" disks
3904 				prilevel = DPRIORITY_UNDERGOAL_MFR;
3905 				lclass = 1;
3906 			} else if (tdc>0) { // next priority - chunks on "marked for removal" disks
3907 				prilevel = DPRIORITY_MFR;
3908 				lclass = 1;
3909 			} else if (vc < goal) { // next priority - standard undergoal chunks
3910 				prilevel = DPRIORITY_UNDERGOAL;
3911 				lclass = 1;
3912 			} else { // lowest priority - wrong labeled chunks
3913 				prilevel = DPRIORITY_WRONGLABEL;
3914 				lclass = 1;
3915 			}
3916 			if (extrajob==0) {
3917 				for (i=0 ; i<prilevel ; i++) {
3918 					if (chunks_priority_leng[i]>0) { // we have chunks with higher priority than current chunk
3919 						chunk_priority_enqueue(prilevel,c); // in such case only enqueue this chunk for future processing
3920 						if (prilevel<DPRIORITY_WRONGLABEL) {
3921 							inforec.notdone.copy_undergoal++;
3922 						} else {
3923 							inforec.notdone.copy_wronglabels++;
3924 						}
3925 						return;
3926 					}
3927 				}
3928 			}
3929 
3930 			matocsserv_get_server_groups(rcsids,MaxWriteRepl[lclass],servmaxpos);
3931 			// rservcount = number of servers that are allowed to start new replication
3932 			if (prilevel<DPRIORITY_UNDERGOAL_MFR) {
3933 				rservcount = servmaxpos[CSSTATE_OVERLOADED];
3934 			} else {
3935 				rservcount = servmaxpos[CSSTATE_OK];
3936 			}
3937 //			rservcount = matocsserv_getservers_lessrepl(rcsids,MaxWriteRepl[lclass],(j<DPRIORITY_UNDERGOAL_MFR)?1:0,&allservflag);
3938 			rgvc=0;
3939 			rgtdc=0;
3940 			vripcnt=0;
3941 			for (s=c->slisthead ; s ; s=s->next) {
3942 				if (matocsserv_replication_read_counter(cstab[s->csid].ptr,now)<MaxReadRepl[lclass]) {
3943 					if (s->valid==VALID) {
3944 						rgvc++;
3945 						if (vripcnt<255 && ReplicationsRespectTopology>1) {
3946 							if (matocsserv_get_csdata(cstab[s->csid].ptr,vrip+vripcnt,NULL,NULL,NULL)==0) {
3947 								vripcnt++;
3948 							}
3949 						}
3950 					} else if (s->valid==TDVALID) {
3951 						rgtdc++;
3952 						if (vripcnt<255 && ReplicationsRespectTopology>1) {
3953 							if (matocsserv_get_csdata(cstab[s->csid].ptr,vrip+vripcnt,NULL,NULL,NULL)==0) {
3954 								vripcnt++;
3955 							}
3956 						}
3957 					}
3958 				}
3959 			}
3960 			if (ReplicationsRespectTopology>1) {
3961 				chunk_rack_sort(rcsids,rservcount,vrip,vripcnt);
3962 			}
3963 			if (rgvc+rgtdc>0 && rservcount>0) { // have at least one server to read from and at least one to write to
3964 				if ((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag)) { // labels version
3965 					uint8_t sclass_mode;
3966 					uint16_t maxlimited;
3967 					uint16_t dstservcnt;
3968 					// reverse order - do_perfect_match matches servers 'from right' - this is important when ReplicationsRespectTopology>1
3969 					servcnt = 0;
3970 					// first set all servers with limited write replication counter
3971 					for (i=servmaxpos[CSSTATE_OVERLOADED] ; i<servmaxpos[CSSTATE_LIMIT_REACHED] ; i++) {
3972 						for (s=c->slisthead ; s && s->csid!=rcsids[i] ; s=s->next) {}
3973 						if (s==NULL) {
3974 							servers[servcnt++] = rcsids[i];
3975 						}
3976 					}
3977 					// then add overloaded servers (but only when they are not already included in rservcount)
3978 					for (i=rservcount ; i<servmaxpos[CSSTATE_OVERLOADED] ; i++) {
3979 						for (s=c->slisthead ; s && s->csid!=rcsids[i] ; s=s->next) {}
3980 						if (s==NULL) {
3981 							servers[servcnt++] = rcsids[i];
3982 						}
3983 					}
3984 					maxlimited = servcnt;
3985 					// then all valid servers in reverse order (matching from the 'right')
3986 					for (i=0 ; i<rservcount ; i++) {
3987 						for (s=c->slisthead ; s && s->csid!=rcsids[rservcount-1-i] ; s=s->next) {}
3988 						if (s==NULL) {
3989 							servers[servcnt++] = rcsids[rservcount-1-i];
3990 						}
3991 					}
3992 					dstservcnt = servcnt;
3993 					for (s=c->slisthead ; s ; s=s->next) {
3994 						if (s->valid==VALID) {
3995 							servers[servcnt++] = s->csid;
3996 						}
3997 					}
3998 					labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
3999 					sclass_mode = sclass_get_mode(c->sclassid);
4000 					matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
4001 					for (i=0 ; i<labelcnt ; i++) {
4002 						int32_t servpos;
4003 						if (matching[i]<(int32_t)labelcnt) {
4004 							servpos=-1;
4005 						} else {
4006 							servpos=matching[i]-labelcnt;
4007 						}
4008 //						syslog(LOG_NOTICE,"labelpos: %u ; serverpos: %d ; serverip: %s ; serverlabels: %s",i,servpos,matocsserv_getstrip(cstab[servers[servpos]].ptr),matocsserv_server_get_labelstr(cstab[servers[servpos]].ptr));
4009 						if (servpos<0) { // matched but only on 'no space' server (or totally unmatched)
4010 							if (sclass_mode==SCLASS_MODE_STRICT) {
4011 								canbefixed = 0;
4012 							} else { // all other modes - labels matched to 'no space' servers only can be replicated anywhere
4013 								for (j=maxlimited ; j<dstservcnt ; j++) { // check all possible destination servers
4014 									if (matching[j+labelcnt]<0) { // matched to label? - if not then we can use it
4015 //										syslog(LOG_NOTICE,"replicate to first available server (STD MODE and LOOSE MODE)");
4016 										if (chunk_undergoal_replicate(c, servers[j], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4017 											return;
4018 										}
4019 									}
4020 								}
4021 							}
4022 						} else if (servpos<maxlimited) { // matched but only on 'busy' server
4023 							if (sclass_mode==SCLASS_MODE_STRICT) {
4024 								canbefixed = 0;
4025 							} else if (sclass_mode==SCLASS_MODE_LOOSE) { // in loose mode labels matched only to overloaded servers
4026 								for (j=maxlimited ; j<dstservcnt ; j++) { // check all possible destination servers
4027 									if (matching[j+labelcnt]<0) { // matched to label? - if not then we can use it
4028 //										syslog(LOG_NOTICE,"replicate to first available server (LOOSE MODE)");
4029 										if (chunk_undergoal_replicate(c, servers[j], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4030 											return;
4031 										}
4032 									}
4033 								}
4034 							} // in standard mode - we leave matching and then do not perform replication
4035 						} else if (servpos<dstservcnt) { // can be replicated to correct label
4036 //							syslog(LOG_NOTICE,"replicate to correct label (all modes)");
4037 							if (chunk_undergoal_replicate(c, servers[servpos], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4038 								return;
4039 							}
4040 						}
4041 					}
4042 /*
4043 					allowallservers = 0;
4044 					if (scount<=rservcount) { // all servers can accept replication
4045 						uint32_t unmatchedlabels;
4046 						unmatchedlabels = 0;
4047 						for (i=0 ; i<labelcnt ; i++) {
4048 							if (matching[i]<0) {
4049 								unmatchedlabels++;
4050 							}
4051 						}
4052 						if (vc >= goal) { // not undergoal chunk
4053 							if (unmatchedlabels >= forcereplication) { // can't fix wrong labels
4054 								canbefixed = 0;
4055 							}
4056 						} else { // this is undergoal chunk
4057 							if (goal>scount && vc==scount) {
4058 								canbefixed = 0;
4059 							} else if (unmatchedlabels>0) { // can't match all labels, so use all chunkservers
4060 								allowallservers = 1;
4061 							}
4062 						}
4063 					}
4064 */
4065 /*
4066 					if (vc < sclass_getgoal(c->sclassid) && csdb_servers_count()<=rservcount) { // all servers can accept replications
4067 						for (i=0 ; i<labelcnt ; i++) {
4068 							if (matching[i]<0) {
4069 								unmatchedlabels++;
4070 							}
4071 						}
4072 					}
4073 */
4074 /*					for (i=0 ; i<dstservcnt && canbefixed ; i++) {
4075 						if (matching[i+labelcnt]>=0 || allowallservers) {
4076 							if (chunk_undergoal_replicate(c, servers[i], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4077 								return;
4078 							}
4079 						}
4080 					}
4081 */
4082 				} else { // classic goal version
4083 					if (goal>servmaxpos[CSSTATE_LIMIT_REACHED] && vc==servmaxpos[CSSTATE_LIMIT_REACHED]) {
4084 						canbefixed = 0;
4085 					} else {
4086 						for (i=0 ; i<rservcount ; i++) {
4087 							for (s=c->slisthead ; s && s->csid!=rcsids[i] ; s=s->next) {}
4088 							if (!s) {
4089 								if (chunk_undergoal_replicate(c, rcsids[i], now, lclass, prilevel, &inforec, rgvc, rgtdc)>=0) {
4090 									return;
4091 								}
4092 							}
4093 						}
4094 					}
4095 				}
4096 			}
4097 			if (canbefixed) { // enqueue only chunks which can be fixed and only if there are servers which reached replication limits
4098 				chunk_priority_enqueue(prilevel,c);
4099 			}
4100 		}
4101 		if (vc < goal) {
4102 			inforec.notdone.copy_undergoal++;
4103 		} else {
4104 			if (canbefixed==0) {
4105 				inforec.labels_dont_match++;
4106 			} else {
4107 				inforec.notdone.copy_wronglabels++;
4108 			}
4109 		}
4110 	}
4111 
4112 	if (extrajob) { // do not rebalane doing "extra" jobs.
4113 		return;
4114 	}
4115 
4116 	if (fullservers==0) {
4117 		uint8_t queues_empty;
4118 		queues_empty = 1;
4119 		for (i=0 ; i<DANGER_PRIORITIES ; i++) {
4120 			if (chunks_priority_leng[i]>0) {
4121 				if (i<REPLICATION_DANGER_PRIORITIES) {
4122 					return; // we have pending undergaal/wronglabeled chunks, so ignore chunkserver rebalance
4123 				} else {
4124 					queues_empty = 0;
4125 				}
4126 			}
4127 		}
4128 		if (queues_empty) {
4129 			if (c->ondangerlist) {
4130 				syslog(LOG_NOTICE,"chunk %016"PRIX64"_%08"PRIX32": fixing 'ondangerlist' flag",c->chunkid,c->version);
4131 				c->ondangerlist = 0;
4132 			}
4133 		}
4134 	}
4135 
4136 	if (goal == vc && vc+tdc>0) {
4137 		double maxdiff;
4138 		uint8_t sclass_mode;
4139 		uint8_t need_label_version;
4140 
4141 //		need_label_version = ((DoNotUseSameIP | DoNotUseSameRack | LabelUniqueMask) || sclass_has_keeparch_labels(c->sclassid,c->archflag))?1:0;
4142 		need_label_version = sclass_has_keeparch_labels(c->sclassid,c->archflag)?1:0;
4143 
4144 		servcnt = 0;
4145 		for (s=c->slisthead ; s ; s=s->next) {
4146 			if (s->valid==VALID) {
4147 				servers[servcnt++] = s->csid;
4148 			}
4149 		}
4150 		extraservcnt = servcnt;
4151 		for (s=c->slisthead ; s ; s=s->next) {
4152 			if (s->valid!=VALID) {
4153 				servers[extraservcnt++] = s->csid;
4154 			}
4155 		}
4156 
4157 		if (need_label_version) {
4158 			labelcnt = sclass_get_keeparch_labelmasks(c->sclassid,c->archflag,&labelmasks);
4159 			sclass_mode = sclass_get_mode(c->sclassid);
4160 			matching = do_perfect_match(labelcnt,servcnt,labelmasks,servers);
4161 		} else {
4162 			// set something to silence potential compiler warnings
4163 			labelcnt = goal;
4164 			sclass_mode = SCLASS_MODE_LOOSE;
4165 			matching = NULL;
4166 		}
4167 
4168 		if (dservcount==0) {
4169 			dservcount = matocsserv_getservers_ordered(dcsids);
4170 		}
4171 
4172 		srccsid = MAXCSCOUNT;
4173 		dstcsid = MAXCSCOUNT;
4174 		maxdiff = 0.0;
4175 
4176 		for (i=0 ; i<servcnt ; i++) {
4177 			uint32_t *labelmask;
4178 			uint8_t anyunmatched;
4179 			double srcusage,dstusage;
4180 			uint8_t lclass;
4181 
4182 			labelmask = NULL;
4183 			anyunmatched = 0;
4184 			if (need_label_version) {
4185 				if (matching[labelcnt+i]>=0) {
4186 					labelmask = labelmasks[matching[labelcnt+i]];
4187 				} else if (sclass_mode!=SCLASS_MODE_LOOSE) {
4188 					anyunmatched = 1;
4189 					// in normal and strict mode you can use any definition that do not match any copy (it will even fix wrong labels)
4190 				}
4191 			}
4192 			srcusage = matocsserv_get_usage(cstab[servers[i]].ptr);
4193 			repl_read_counter = matocsserv_replication_read_counter(cstab[servers[i]].ptr,now);
4194 			if (repl_read_counter < MaxReadRepl[2] || repl_read_counter < MaxReadRepl[3]) { // here accept any rebalance limit
4195 				for (j=0 ; j<dservcount ; j++) {
4196 					dstusage = matocsserv_get_usage(cstab[dcsids[j]].ptr);
4197 					if (srcusage - dstusage < maxdiff) { // already found better src,dst pair
4198 						break;
4199 					}
4200 					if (((srcusage - dstusage) <= AcceptableDifference) && last_rebalance+(0.01/(srcusage-dstusage))>=now) { // when difference is small then do not hurry
4201 						break;
4202 					}
4203 					k = 0;
4204 					while (k<extraservcnt) {
4205 						if (servers[k]==dcsids[j]) {
4206 							break;
4207 						}
4208 						if (DoNotUseSameIP) {
4209 							if (matocsserv_server_get_ip(cstab[servers[k]].ptr)==matocsserv_server_get_ip(cstab[dcsids[j]].ptr)) {
4210 								break;
4211 							}
4212 						} else if (DoNotUseSameRack) {
4213 							if (topology_get_rackid(matocsserv_server_get_ip(cstab[servers[k]].ptr))==topology_get_rackid(matocsserv_server_get_ip(cstab[dcsids[j]].ptr))) {
4214 								break;
4215 							}
4216 						} else if (LabelUniqueMask) {
4217 							if ((matocsserv_server_get_labelmask(cstab[servers[k]].ptr) & LabelUniqueMask) == (matocsserv_server_get_labelmask(cstab[dcsids[j]].ptr) & LabelUniqueMask)) {
4218 								break;
4219 							}
4220 						}
4221 						k++;
4222 					}
4223 					if (k<extraservcnt) { // one of existing copies is on this server or this server has same IP/RACKID/UNIQLABELS
4224 						continue;
4225 					}
4226 					if (anyunmatched) { // check if this server match any of unmatched labels
4227 						for (k=0 ; k<labelcnt ; k++) {
4228 							if (matching[k]<0) {
4229 								if (matocsserv_server_has_labels(cstab[dcsids[j]].ptr,labelmasks[k])) {
4230 									break;
4231 								}
4232 							}
4233 						}
4234 						if (k==labelcnt) { // nope
4235 							break;
4236 						}
4237 					}
4238 					if (labelmask==NULL || matocsserv_server_has_labels(cstab[dcsids[j]].ptr,labelmask)) { // if we have specific label then check if this server has them
4239 						if ((srcusage - dstusage) > AcceptableDifference*1.5) { // now we know usage difference, so we can set proper limit class
4240 							lclass = 3;
4241 						} else {
4242 							lclass = 2;
4243 						}
4244 						if (repl_read_counter < MaxReadRepl[lclass] && matocsserv_replication_write_counter(cstab[dcsids[j]].ptr,now)<MaxWriteRepl[lclass]) {
4245 							maxdiff = srcusage - dstusage;
4246 							dstcsid = dcsids[j];
4247 							srccsid = servers[i];
4248 						}
4249 					}
4250 				}
4251 			}
4252 		}
4253 
4254 		if (dstcsid!=MAXCSCOUNT && srccsid!=MAXCSCOUNT) {
4255 			stats_chunkops[CHUNK_OP_REPLICATE_TRY]++;
4256 			matocsserv_send_replicatechunk(cstab[dstcsid].ptr,c->chunkid,c->version,cstab[srccsid].ptr);
4257 			c->needverincrease = 1;
4258 			inforec.copy_rebalance++;
4259 			last_rebalance = now;
4260 		}
4261 
4262 		return;
4263 	}
4264 }
4265 
chunk_labelset_can_be_fulfilled(uint8_t labelcnt,uint32_t ** labelmasks)4266 uint8_t chunk_labelset_can_be_fulfilled(uint8_t labelcnt,uint32_t **labelmasks) {
4267 	static uint16_t *stdcsids = NULL;
4268 	static uint16_t *olcsids = NULL;
4269 	static uint16_t *allcsids = NULL;
4270 	static uint16_t stdcscnt;
4271 	static uint16_t olcscnt;
4272 	static uint16_t allcscnt;
4273 	uint8_t r;
4274 	uint32_t i;
4275 	int32_t *matching;
4276 
4277 	if (stdcsids==NULL) {
4278 		stdcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
4279 		passert(stdcsids);
4280 	}
4281 	if (olcsids==NULL) {
4282 		olcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
4283 		passert(olcsids);
4284 	}
4285 	if (allcsids==NULL) {
4286 		allcsids = malloc(sizeof(uint16_t)*MAXCSCOUNT);
4287 		passert(allcsids);
4288 	}
4289 
4290 	if (labelcnt==0 || labelmasks==NULL) {
4291 		matocsserv_getservers_test(&stdcscnt,stdcsids,&olcscnt,olcsids,&allcscnt,allcsids);
4292 	}
4293 
4294 	matching = do_perfect_match(labelcnt,stdcscnt,labelmasks,stdcsids);
4295 
4296 	r = 1;
4297 	for (i=0 ; i<labelcnt ; i++) {
4298 		if (matching[i]<0) {
4299 			r = 0;
4300 			break;
4301 		}
4302 	}
4303 	if (r==1) {
4304 		return 3; // can be fulfilled
4305 	}
4306 
4307 	if (olcsids > stdcsids) {
4308 		matching = do_perfect_match(labelcnt,olcscnt,labelmasks,olcsids);
4309 
4310 		r = 1;
4311 		for (i=0 ; i<labelcnt ; i++) {
4312 			if (matching[i]<0) {
4313 				r = 0;
4314 				break;
4315 			}
4316 		}
4317 		if (r==1) {
4318 			return 2; // can be fulfilled using overloaded servers
4319 		}
4320 	}
4321 
4322 	if (allcsids > olcsids) {
4323 		matching = do_perfect_match(labelcnt,allcscnt,labelmasks,allcsids);
4324 
4325 		r = 1;
4326 		for (i=0 ; i<labelcnt ; i++) {
4327 			if (matching[i]<0) {
4328 				r = 0;
4329 				break;
4330 			}
4331 		}
4332 		if (r==1) {
4333 			return 1; // can be fulfilled using servers with no space available
4334 		}
4335 	}
4336 
4337 	return 0;
4338 }
4339 
chunk_clean_priority_queues(void)4340 static inline void chunk_clean_priority_queues(void) {
4341 	uint32_t j,l;
4342 	for (j=0 ; j<DANGER_PRIORITIES ; j++) {
4343 		for (l=0 ; l<DangerMaxLeng ; l++) {
4344 			if (chunks_priority_queue[j][l]!=NULL) {
4345 				chunks_priority_queue[j][l]->ondangerlist = 0;
4346 			}
4347 			chunks_priority_queue[j][l] = NULL;
4348 		}
4349 		chunks_priority_head[j] = 0;
4350 		chunks_priority_tail[j] = 0;
4351 		chunks_priority_leng[j] = 0;
4352 	}
4353 }
4354 
chunk_unlock(uint64_t chunkid)4355 int chunk_unlock(uint64_t chunkid) {
4356 	chunk *c;
4357 
4358 	c = chunk_find(chunkid);
4359 	if (c==NULL) {
4360 		return MFS_ERROR_NOCHUNK;
4361 	}
4362 	c->lockedto = 0;
4363 	chunk_write_counters(c,0);
4364 	chunk_priority_queue_check(c,1);
4365 	if (c->ondangerlist) {
4366 		chunk_do_jobs(c,matocsserv_servers_count(),matocsserv_almostfull_servers(),main_time(),1);
4367 	} else {
4368 		matoclserv_chunk_unlocked(c->chunkid,c);
4369 	}
4370 	return MFS_STATUS_OK;
4371 }
4372 
chunk_mr_unlock(uint64_t chunkid)4373 int chunk_mr_unlock(uint64_t chunkid) {
4374 	chunk *c;
4375 	c = chunk_find(chunkid);
4376 	if (c==NULL) {
4377 		return MFS_ERROR_NOCHUNK;
4378 	}
4379 	c->lockedto = 0;
4380 	chunk_write_counters(c,0);
4381 	return MFS_STATUS_OK;
4382 }
4383 
4384 
chunk_jobs_main(void)4385 void chunk_jobs_main(void) {
4386 	uint32_t i,j,l,h,t,lc,hashsteps;
4387 	uint16_t scount,csid;
4388 	uint16_t fullservers;
4389 	chunk *c,*cn;
4390 	uint32_t now;
4391 #ifdef MFSDEBUG
4392 	static uint32_t lastsecond=0;
4393 #endif
4394 
4395 	chunk_server_disconnection_loop();
4396 	chunk_server_check_delays();
4397 
4398 	if (chunk_counters_in_progress()) {
4399 		return;
4400 	}
4401 	if (starttime+ReplicationsDelayInit>main_time()) {
4402 		return;
4403 	}
4404 
4405 	// full servers are not included here
4406 	scount = matocsserv_servers_count();
4407 
4408 	if (scount==0 || chunkrehashpos==0) {
4409 		return;
4410 	}
4411 
4412 	fullservers = matocsserv_almostfull_servers();
4413 
4414 	now = main_time();
4415 	chunk_do_jobs(NULL,JOBS_EVERYTICK,0,now,0);
4416 
4417 	// first serve some endangered and undergoal chunks
4418 	lc = 0;
4419 	for (j=0 ; j<DANGER_PRIORITIES ; j++) {
4420 		if (((chunks_priority_tail[j]+chunks_priority_leng[j])%DangerMaxLeng) != chunks_priority_head[j]) {
4421 			syslog(LOG_NOTICE,"danger_priority_group %"PRIu32": serious structure error, head: %"PRIu32"; tail: %"PRIu32"; leng: %"PRIu32,j,chunks_priority_head[j],chunks_priority_tail[j],+chunks_priority_leng[j]);
4422 			for (l=0 ; l<DangerMaxLeng ; l++) {
4423 				if (chunks_priority_queue[j][l]!=NULL) {
4424 					chunks_priority_queue[j][l]->ondangerlist = 0;
4425 				}
4426 				chunks_priority_queue[j][l] = NULL;
4427 			}
4428 			chunks_priority_head[j] = 0;
4429 			chunks_priority_tail[j] = 0;
4430 			chunks_priority_leng[j] = 0;
4431 		}
4432 #ifdef MFSDEBUG
4433 		l = chunks_priority_leng[j];
4434 #endif
4435 		if (lc > (1.0 - chunks_priority_mincpsperc[j])*HashCPTMax) { // prevent starvation of lowest priority chunks by highest priority chunks
4436 			lc = HashCPTMax * (1.0 - chunks_priority_mincpsperc[j]);
4437 		}
4438 		if (chunks_priority_leng[j]>0 && lc<HashCPTMax) {
4439 			h = chunks_priority_head[j];
4440 			t = chunks_priority_tail[j];
4441 			do {
4442 				c = chunks_priority_queue[j][t];
4443 				chunks_priority_queue[j][t] = NULL;
4444 				t = (t+1)%DangerMaxLeng;
4445 				chunks_priority_tail[j] = t;
4446 				chunks_priority_leng[j]--;
4447 				if (c!=NULL) {
4448 					c->ondangerlist = 0;
4449 					chunk_do_jobs(c,scount,fullservers,now,1);
4450 					lc++;
4451 				}
4452 			} while (t!=h && lc<HashCPTMax);
4453 		}
4454 #ifdef MFSDEBUG
4455 		if (now!=lastsecond) {
4456 			syslog(LOG_NOTICE,"danger_priority_group %"PRIu32": %"PRIu32"->%"PRIu32,j,l,chunks_priority_leng[j]);
4457 		}
4458 #endif
4459 	}
4460 #ifdef MFSDEBUG
4461 	lastsecond=now;
4462 #endif
4463 
4464 	// then serve standard chunks
4465 	lc = 0;
4466 	hashsteps = 1+((chunkrehashpos)/(LoopTimeMin*TICKSPERSECOND));
4467 	for (i=0 ; i<hashsteps && lc<HashCPTMax ; i++) {
4468 		if (jobshcnt>=chunkrehashpos) {
4469 			chunk_do_jobs(NULL,JOBS_EVERYLOOP,0,now,0);	// every loop tasks
4470 			jobshpos = 0;
4471 			jobshcnt = 0;
4472 			jobshmax = chunkrehashpos;
4473 			jobshstep *= 16;
4474 			if (jobshstep==0 || jobshstep>=jobshmax) {
4475 				jobshstep = 1;
4476 			} else {
4477 				if ((jobshmax&1)==0) {
4478 					jobshmax--;
4479 				}
4480 			}
4481 			for (csid = csusedhead ; csid < MAXCSCOUNT ; csid = cstab[csid].next) {
4482 				switch (cstab[csid].mfr_state) {
4483 					case CAN_BE_REMOVED:
4484 					case UNKNOWN_SOFT:
4485 					case WAS_IN_PROGRESS:
4486 						cstab[csid].mfr_state = CAN_BE_REMOVED;
4487 						break;
4488 					case REPL_IN_PROGRESS:
4489 						cstab[csid].mfr_state = WAS_IN_PROGRESS;
4490 						break;
4491 					/* case UNKNOWN_HARD: */
4492 					default:
4493 						cstab[csid].mfr_state = UNKNOWN_SOFT;
4494 						break;
4495 				}
4496 			}
4497 		} else {
4498 			c = chunkhashtab[jobshpos>>HASHTAB_LOBITS][jobshpos&HASHTAB_MASK];
4499 			while (c) {
4500 				cn = c->next;
4501 				if (c->lockedto<(uint32_t)main_time() && c->slisthead==NULL && c->fcount==0 && c->ondangerlist==0 && chunk_counters_in_progress()==0 && csdb_have_all_servers()) {
4502 					changelog("%"PRIu32"|CHUNKDEL(%"PRIu64",%"PRIu32")",main_time(),c->chunkid,c->version);
4503 					chunk_delete(c);
4504 				} else {
4505 					chunk_do_jobs(c,scount,fullservers,now,0);
4506 					lc++;
4507 				}
4508 				c = cn;
4509 			}
4510 			jobshcnt++;
4511 			if (jobshcnt<jobshmax) {
4512 				jobshpos += jobshstep;
4513 				jobshpos %= jobshmax;
4514 			} else {
4515 				jobshpos = jobshcnt;
4516 			}
4517 		}
4518 	}
4519 }
4520 
4521 /* ---- */
4522 
4523 #define CHUNKFSIZE 17
4524 /*
4525 void chunk_text_dump(FILE *fd) {
4526 	chunk *c;
4527 	uint32_t i,lockedto,now;
4528 	now = main_time();
4529 
4530 	for (i=0 ; i<chunkrehashpos ; i++) {
4531 		for (c=chunkhashtab[i>>HASHTAB_LOBITS][i&HASHTAB_MASK] ; c ; c=c->next) {
4532 			lockedto = c->lockedto;
4533 			if (lockedto<now) {
4534 				lockedto = 0;
4535 			}
4536 			fprintf(fd,"*|i:%016"PRIX64"|v:%08"PRIX32"|g:%"PRIu8"|t:%10"PRIu32"\n",c->chunkid,c->version,c->sclassid,lockedto);
4537 		}
4538 	}
4539 }
4540 */
chunk_load(bio * fd,uint8_t mver)4541 int chunk_load(bio *fd,uint8_t mver) {
4542 	uint8_t hdr[8];
4543 	uint8_t loadbuff[CHUNKFSIZE];
4544 	const uint8_t *ptr;
4545 	int32_t r,recsize;
4546 	chunk *c;
4547 // chunkdata
4548 	uint64_t chunkid;
4549 	uint32_t version,lockedto;
4550 	uint8_t archflag;
4551 
4552 	chunks=0;
4553 	if (bio_read(fd,hdr,8)!=8) {
4554 		syslog(LOG_WARNING,"chunks: can't read header");
4555 		return -1;
4556 	}
4557 	ptr = hdr;
4558 	nextchunkid = get64bit(&ptr);
4559 	recsize = (mver==0x10)?16:CHUNKFSIZE;
4560 	for (;;) {
4561 		r = bio_read(fd,loadbuff,recsize);
4562 		if (r!=recsize) {
4563 			syslog(LOG_WARNING,"chunks: read error");
4564 			return -1;
4565 		}
4566 		ptr = loadbuff;
4567 		chunkid = get64bit(&ptr);
4568 		version = get32bit(&ptr);
4569 		lockedto = get32bit(&ptr);
4570 		if (mver==0x10) {
4571 			archflag = 0;
4572 		} else {
4573 			archflag = get8bit(&ptr);
4574 		}
4575 		if (chunkid>0) {
4576 			c = chunk_new(chunkid);
4577 			c->version = version;
4578 			c->lockedto = lockedto;
4579 			c->archflag = archflag;
4580 		} else {
4581 			if (version==0 && lockedto==0 && archflag==0) {
4582 				return 0;
4583 			} else {
4584 				syslog(LOG_WARNING,"chunks: wrong ending - chunk zero with version: %"PRIu32" and locked to: %"PRIu32,version,lockedto);
4585 				return -1;
4586 			}
4587 		}
4588 	}
4589 	return 0;	// unreachable
4590 }
4591 
chunk_store(bio * fd)4592 uint8_t chunk_store(bio *fd) {
4593 	uint8_t hdr[8];
4594 	uint8_t storebuff[CHUNKFSIZE];
4595 	uint8_t *ptr;
4596 	uint8_t archflag;
4597 	uint32_t i;
4598 	chunk *c;
4599 // chunkdata
4600 	uint64_t chunkid;
4601 	uint32_t version;
4602 	uint32_t lockedto,now;
4603 
4604 	if (fd==NULL) {
4605 		return 0x11;
4606 	}
4607 	now = main_time();
4608 	ptr = hdr;
4609 	put64bit(&ptr,nextchunkid);
4610 	if (bio_write(fd,hdr,8)!=8) {
4611 		return 0xFF;
4612 	}
4613 	for (i=0 ; i<chunkrehashpos ; i++) {
4614 		for (c=chunkhashtab[i>>HASHTAB_LOBITS][i&HASHTAB_MASK] ; c ; c=c->next) {
4615 			ptr = storebuff;
4616 			chunkid = c->chunkid;
4617 			put64bit(&ptr,chunkid);
4618 			version = c->version;
4619 			put32bit(&ptr,version);
4620 			lockedto = c->lockedto;
4621 			if (lockedto<now) {
4622 				lockedto = 0;
4623 			}
4624 			put32bit(&ptr,lockedto);
4625 			archflag = c->archflag;
4626 			put8bit(&ptr,archflag);
4627 			if (bio_write(fd,storebuff,CHUNKFSIZE)!=CHUNKFSIZE) {
4628 				return 0xFF;
4629 			}
4630 		}
4631 	}
4632 	memset(storebuff,0,CHUNKFSIZE);
4633 	if (bio_write(fd,storebuff,CHUNKFSIZE)!=CHUNKFSIZE) {
4634 		return 0xFF;
4635 	}
4636 	return 0;
4637 }
4638 
chunk_cleanup(void)4639 void chunk_cleanup(void) {
4640 	uint32_t i,j;
4641 	discserv *ds;
4642 //	slist_bucket *sb,*sbn;
4643 //	chunk_bucket *cb,*cbn;
4644 
4645 	chunk_clean_priority_queues();
4646 	while (discservers) {
4647 		ds = discservers;
4648 		discservers = discservers->next;
4649 		matocsserv_disconnection_finished(cstab[ds->csid].ptr);
4650 		free(ds);
4651 	}
4652 	while (discservers_next) {
4653 		ds = discservers_next;
4654 		discservers_next = discservers_next->next;
4655 		matocsserv_disconnection_finished(cstab[ds->csid].ptr);
4656 		free(ds);
4657 	}
4658 	slist_free_all();
4659 //	for (sb = sbhead ; sb ; sb = sbn) {
4660 //		sbn = sb->next;
4661 //		free(sb);
4662 //	}
4663 //	sbhead = NULL;
4664 //	slfreehead = NULL;
4665 	chunk_free_all();
4666 //	for (cb = cbhead ; cb ; cb = cbn) {
4667 //		cbn = cb->next;
4668 //		free(cb);
4669 //	}
4670 //	cbhead = NULL;
4671 //	chfreehead = NULL;
4672 	chunk_hash_cleanup();
4673 //	for (i=0 ; i<HASHSIZE ; i++) {
4674 //		chunkhash[i] = NULL;
4675 //	}
4676 	for (i=0 ; i<MAXCSCOUNT ; i++) {
4677 		cstab[i].next = i+1;
4678 		cstab[i].prev = i-1;
4679 		cstab[i].valid = 0;
4680 		cstab[i].registered = 0;
4681 		cstab[i].newchunkdelay = 0;
4682 		cstab[i].lostchunkdelay = 0;
4683 	}
4684 	cstab[0].prev = MAXCSCOUNT;
4685 	csfreehead = 0;
4686 	csfreetail = MAXCSCOUNT-1;
4687 	csusedhead = MAXCSCOUNT;
4688 	for (i=0 ; i<MAXSCLASS*2 ; i++) {
4689 		for (j=0 ; j<11 ; j++) {
4690 			allchunkcounts[i][j]=0;
4691 			regularchunkcounts[i][j]=0;
4692 		}
4693 	}
4694 }
4695 
chunk_newfs(void)4696 void chunk_newfs(void) {
4697 	chunks = 0;
4698 	nextchunkid = 1;
4699 }
4700 
chunk_parse_rep_list(char * strlist,double * replist)4701 int chunk_parse_rep_list(char *strlist,double *replist) {
4702 	// N
4703 	// A,B,C,D
4704 	char *p;
4705 	uint32_t i;
4706 	double reptmp[4];
4707 
4708 	p = strlist;
4709 	while (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') {
4710 		p++;
4711 	}
4712 	reptmp[0] = strtod(p,&p);
4713 	while (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') {
4714 		p++;
4715 	}
4716 	if (*p==0) {
4717 		for (i=0 ; i<4 ; i++) {
4718 			replist[i] = reptmp[0];
4719 		}
4720 		return 0;
4721 	}
4722 	for (i=1 ; i<4 ; i++) {
4723 		if (*p!=',') {
4724 			return -1;
4725 		}
4726 		p++;
4727 		while (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') {
4728 			p++;
4729 		}
4730 		reptmp[i] = strtod(p,&p);
4731 		while (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') {
4732 			p++;
4733 		}
4734 	}
4735 	if (*p==0) {
4736 		for (i=0 ; i<4 ; i++) {
4737 			replist[i] = reptmp[i];
4738 		}
4739 		return 1;
4740 	}
4741 	return -1;
4742 }
4743 
chunk_term(void)4744 void chunk_term(void) {
4745 	uint32_t i;
4746 	chunk_priority_queue_check(NULL,1); // free tabs
4747 	chunk_do_jobs(NULL,JOBS_TERM,0,main_time(),0); // free tabs
4748 	for (i=0 ; i<MAXSCLASS*2 ; i++) {
4749 		free(allchunkcounts[i]);
4750 		free(regularchunkcounts[i]);
4751 	}
4752 	free(allchunkcounts);
4753 	free(regularchunkcounts);
4754 }
4755 
chunk_load_cfg_common(void)4756 void chunk_load_cfg_common(void) {
4757 	uint32_t uniqmode;
4758 
4759 	uniqmode = cfg_getuint32("CHUNKS_UNIQUE_MODE",0);
4760 
4761 	DoNotUseSameIP=0;
4762 	DoNotUseSameRack=0;
4763 	LabelUniqueMask=0;
4764 
4765 	if (uniqmode==1) {
4766 		DoNotUseSameIP=1;
4767 	} else if (uniqmode==2) {
4768 		DoNotUseSameRack=1;
4769 	}
4770 
4771 	ReplicationsDelayInit = cfg_getuint32("REPLICATIONS_DELAY_INIT",60);
4772 	ReplicationsRespectTopology = cfg_getuint8("REPLICATIONS_RESPECT_TOPOLOGY",0);
4773 	CreationsRespectTopology = cfg_getuint32("CREATIONS_RESPECT_TOPOLOGY",0);
4774 
4775 	if (cfg_isdefined("ACCEPTABLE_PERCENTAGE_DIFFERENCE")) {
4776 		AcceptableDifference = cfg_getdouble("ACCEPTABLE_PERCENTAGE_DIFFERENCE",1.0)/100.0; // 1%
4777 	} else {
4778 		AcceptableDifference = cfg_getdouble("ACCEPTABLE_DIFFERENCE",0.01); // 1% - deprecated option
4779 	}
4780 	if (AcceptableDifference<0.001) { // 0.1%
4781 		AcceptableDifference = 0.001;
4782 	}
4783 	if (AcceptableDifference>0.1) { // 10%
4784 		AcceptableDifference = 0.1;
4785 	}
4786 
4787 	DangerMaxLeng = cfg_getuint32("PRIORITY_QUEUES_LENGTH",1000000);
4788 	if (DangerMaxLeng<10000) {
4789 		DangerMaxLeng = 10000;
4790 	}
4791 }
4792 
chunk_reload(void)4793 void chunk_reload(void) {
4794 	uint32_t oldMaxDelSoftLimit,oldMaxDelHardLimit,oldDangerMaxLeng;
4795 	uint32_t cps,i,j;
4796 	char *repstr;
4797 
4798 	oldDangerMaxLeng = DangerMaxLeng;
4799 
4800 	oldMaxDelSoftLimit = MaxDelSoftLimit;
4801 	oldMaxDelHardLimit = MaxDelHardLimit;
4802 
4803 	chunk_load_cfg_common();
4804 
4805 	MaxDelSoftLimit = cfg_getuint32("CHUNKS_SOFT_DEL_LIMIT",10);
4806 	if (cfg_isdefined("CHUNKS_HARD_DEL_LIMIT")) {
4807 		MaxDelHardLimit = cfg_getuint32("CHUNKS_HARD_DEL_LIMIT",25);
4808 		if (MaxDelHardLimit<MaxDelSoftLimit) {
4809 			MaxDelSoftLimit = MaxDelHardLimit;
4810 			syslog(LOG_WARNING,"CHUNKS_SOFT_DEL_LIMIT is greater than CHUNKS_HARD_DEL_LIMIT - using CHUNKS_HARD_DEL_LIMIT for both");
4811 		}
4812 	} else {
4813 		MaxDelHardLimit = 3 * MaxDelSoftLimit;
4814 	}
4815 	if (MaxDelSoftLimit==0) {
4816 		MaxDelSoftLimit = oldMaxDelSoftLimit;
4817 		MaxDelHardLimit = oldMaxDelHardLimit;
4818 	}
4819 	if (TmpMaxDelFrac<MaxDelSoftLimit) {
4820 		TmpMaxDelFrac = MaxDelSoftLimit;
4821 	}
4822 	if (TmpMaxDelFrac>MaxDelHardLimit) {
4823 		TmpMaxDelFrac = MaxDelHardLimit;
4824 	}
4825 	if (TmpMaxDel<MaxDelSoftLimit) {
4826 		TmpMaxDel = MaxDelSoftLimit;
4827 	}
4828 	if (TmpMaxDel>MaxDelHardLimit) {
4829 		TmpMaxDel = MaxDelHardLimit;
4830 	}
4831 
4832 
4833 	repstr = cfg_getstr("CHUNKS_WRITE_REP_LIMIT","2,1,1,4");
4834 	switch (chunk_parse_rep_list(repstr,MaxWriteRepl)) {
4835 		case -1:
4836 			syslog(LOG_WARNING,"write replication limit parse error !!!");
4837 			break;
4838 		case 0:
4839 			syslog(LOG_NOTICE,"write replication limit in old format - change limits to new format");
4840 	}
4841 	free(repstr);
4842 	repstr = cfg_getstr("CHUNKS_READ_REP_LIMIT","10,5,2,5");
4843 	switch (chunk_parse_rep_list(repstr,MaxReadRepl)) {
4844 		case -1:
4845 			syslog(LOG_WARNING,"read replication limit parse error !!!");
4846 			break;
4847 		case 0:
4848 			syslog(LOG_NOTICE,"read replication limit in old format - change limits to new format");
4849 	}
4850 	free(repstr);
4851 /*
4852 	repl = cfg_getuint32("CHUNKS_WRITE_REP_LIMIT",2);
4853 	if (repl>0) {
4854 		MaxWriteRepl = repl;
4855 	}
4856 
4857 
4858 	repl = cfg_getuint32("CHUNKS_READ_REP_LIMIT",10);
4859 	if (repl>0) {
4860 		MaxReadRepl = repl;
4861 	}
4862 */
4863 	if (cfg_isdefined("CHUNKS_LOOP_TIME")) {
4864 		LoopTimeMin = cfg_getuint32("CHUNKS_LOOP_TIME",300); // deprecated option
4865 		if (LoopTimeMin < MINLOOPTIME) {
4866 			syslog(LOG_NOTICE,"CHUNKS_LOOP_TIME value too low (%"PRIu32") increased to %u",LoopTimeMin,MINLOOPTIME);
4867 			LoopTimeMin = MINLOOPTIME;
4868 		}
4869 		if (LoopTimeMin > MAXLOOPTIME) {
4870 			syslog(LOG_NOTICE,"CHUNKS_LOOP_TIME value too high (%"PRIu32") decreased to %u",LoopTimeMin,MAXLOOPTIME);
4871 			LoopTimeMin = MAXLOOPTIME;
4872 		}
4873 //		HashSteps = 1+((HASHSIZE)/(LoopTimeMin*TICKSPERSECOND));
4874 		HashCPTMax = 0xFFFFFFFF;
4875 	} else {
4876 		LoopTimeMin = cfg_getuint32("CHUNKS_LOOP_MIN_TIME",300);
4877 		if (LoopTimeMin < MINLOOPTIME) {
4878 			syslog(LOG_NOTICE,"CHUNKS_LOOP_MIN_TIME value too low (%"PRIu32") increased to %u",LoopTimeMin,MINLOOPTIME);
4879 			LoopTimeMin = MINLOOPTIME;
4880 		}
4881 		if (LoopTimeMin > MAXLOOPTIME) {
4882 			syslog(LOG_NOTICE,"CHUNKS_LOOP_MIN_TIME value too high (%"PRIu32") decreased to %u",LoopTimeMin,MAXLOOPTIME);
4883 			LoopTimeMin = MAXLOOPTIME;
4884 		}
4885 //		HashSteps = 1+((HASHSIZE)/(LoopTimeMin*TICKSPERSECOND));
4886 		cps = cfg_getuint32("CHUNKS_LOOP_MAX_CPS",100000);
4887 		if (cps < MINCPS) {
4888 			syslog(LOG_NOTICE,"CHUNKS_LOOP_MAX_CPS value too low (%"PRIu32") increased to %u",cps,MINCPS);
4889 			cps = MINCPS;
4890 		}
4891 		if (cps > MAXCPS) {
4892 			syslog(LOG_NOTICE,"CHUNKS_LOOP_MAX_CPS value too high (%"PRIu32") decreased to %u",cps,MAXCPS);
4893 			cps = MAXCPS;
4894 		}
4895 		HashCPTMax = ((cps+(TICKSPERSECOND-1))/TICKSPERSECOND);
4896 	}
4897 
4898 	if (DangerMaxLeng != oldDangerMaxLeng) {
4899 		for (j=0 ; j<DANGER_PRIORITIES ; j++) {
4900 			if (chunks_priority_leng[j]>0) {
4901 				for (i=chunks_priority_tail[j] ; i!=chunks_priority_head[j] ; i = (i+1)%oldDangerMaxLeng) {
4902 					if (chunks_priority_queue[j][i]!=NULL) {
4903 						chunks_priority_queue[j][i]->ondangerlist=0;
4904 					}
4905 				}
4906 			}
4907 			free(chunks_priority_queue[j]);
4908 			chunks_priority_queue[j] = (chunk**)malloc(sizeof(chunk*)*DangerMaxLeng);
4909 			passert(chunks_priority_queue[j]);
4910 			for (i=0 ; i<DangerMaxLeng ; i++) {
4911 				chunks_priority_queue[j][i] = NULL;
4912 			}
4913 			chunks_priority_leng[j] = 0;
4914 			chunks_priority_head[j] = 0;
4915 			chunks_priority_tail[j] = 0;
4916 		}
4917 	}
4918 }
4919 
chunk_strinit(void)4920 int chunk_strinit(void) {
4921 	uint32_t i;
4922 	uint32_t j;
4923 	uint32_t cps;
4924 	char *repstr;
4925 
4926 	starttime = main_time();
4927 
4928 	chunk_load_cfg_common();
4929 
4930 	MaxDelSoftLimit = cfg_getuint32("CHUNKS_SOFT_DEL_LIMIT",10);
4931 	if (cfg_isdefined("CHUNKS_HARD_DEL_LIMIT")) {
4932 		MaxDelHardLimit = cfg_getuint32("CHUNKS_HARD_DEL_LIMIT",25);
4933 		if (MaxDelHardLimit<MaxDelSoftLimit) {
4934 			MaxDelSoftLimit = MaxDelHardLimit;
4935 			fprintf(stderr,"CHUNKS_SOFT_DEL_LIMIT is greater than CHUNKS_HARD_DEL_LIMIT - using CHUNKS_HARD_DEL_LIMIT for both\n");
4936 		}
4937 	} else {
4938 		MaxDelHardLimit = 3 * MaxDelSoftLimit;
4939 	}
4940 	if (MaxDelSoftLimit==0) {
4941 		fprintf(stderr,"delete limit is zero !!!\n");
4942 		return -1;
4943 	}
4944 	TmpMaxDelFrac = MaxDelSoftLimit;
4945 	TmpMaxDel = MaxDelSoftLimit;
4946 
4947 	repstr = cfg_getstr("CHUNKS_WRITE_REP_LIMIT","2,1,1,4");
4948 	switch (chunk_parse_rep_list(repstr,MaxWriteRepl)) {
4949 		case -1:
4950 			fprintf(stderr,"write replication limit parse error !!!\n");
4951 			return -1;
4952 		case 0:
4953 			fprintf(stderr,"write replication limit in old format - change limits to new format\n");
4954 	}
4955 	free(repstr);
4956 	repstr = cfg_getstr("CHUNKS_READ_REP_LIMIT","10,5,2,5");
4957 	switch (chunk_parse_rep_list(repstr,MaxReadRepl)) {
4958 		case -1:
4959 			fprintf(stderr,"read replication limit parse error !!!\n");
4960 			return -1;
4961 		case 0:
4962 			fprintf(stderr,"read replication limit in old format - change limits to new format\n");
4963 	}
4964 	free(repstr);
4965 /*
4966 	MaxWriteRepl = cfg_getuint32("CHUNKS_WRITE_REP_LIMIT",2);
4967 	MaxReadRepl = cfg_getuint32("CHUNKS_READ_REP_LIMIT",10);
4968 	if (MaxReadRepl==0) {
4969 		fprintf(stderr,"read replication limit is zero !!!\n");
4970 		return -1;
4971 	}
4972 	if (MaxWriteRepl==0) {
4973 		fprintf(stderr,"write replication limit is zero !!!\n");
4974 		return -1;
4975 	}
4976 */
4977 	if (cfg_isdefined("CHUNKS_LOOP_TIME")) {
4978 		fprintf(stderr,"Defining loop time by CHUNKS_LOOP_TIME option is deprecated - use CHUNKS_LOOP_MAX_CPS and CHUNKS_LOOP_MIN_TIME\n");
4979 		LoopTimeMin = cfg_getuint32("CHUNKS_LOOP_TIME",300); // deprecated option
4980 		if (LoopTimeMin < MINLOOPTIME) {
4981 			fprintf(stderr,"CHUNKS_LOOP_TIME value too low (%"PRIu32") increased to %u\n",LoopTimeMin,MINLOOPTIME);
4982 			LoopTimeMin = MINLOOPTIME;
4983 		}
4984 		if (LoopTimeMin > MAXLOOPTIME) {
4985 			fprintf(stderr,"CHUNKS_LOOP_TIME value too high (%"PRIu32") decreased to %u\n",LoopTimeMin,MAXLOOPTIME);
4986 			LoopTimeMin = MAXLOOPTIME;
4987 		}
4988 //		HashSteps = 1+((HASHSIZE)/(LoopTimeMin*TICKSPERSECOND));
4989 		HashCPTMax = 0xFFFFFFFF;
4990 	} else {
4991 		LoopTimeMin = cfg_getuint32("CHUNKS_LOOP_MIN_TIME",300);
4992 		if (LoopTimeMin < MINLOOPTIME) {
4993 			fprintf(stderr,"CHUNKS_LOOP_MIN_TIME value too low (%"PRIu32") increased to %u\n",LoopTimeMin,MINLOOPTIME);
4994 			LoopTimeMin = MINLOOPTIME;
4995 		}
4996 		if (LoopTimeMin > MAXLOOPTIME) {
4997 			fprintf(stderr,"CHUNKS_LOOP_MIN_TIME value too high (%"PRIu32") decreased to %u\n",LoopTimeMin,MAXLOOPTIME);
4998 			LoopTimeMin = MAXLOOPTIME;
4999 		}
5000 //		HashSteps = 1+((HASHSIZE)/(LoopTimeMin*TICKSPERSECOND));
5001 		cps = cfg_getuint32("CHUNKS_LOOP_MAX_CPS",100000);
5002 		if (cps < MINCPS) {
5003 			fprintf(stderr,"CHUNKS_LOOP_MAX_CPS value too low (%"PRIu32") increased to %u\n",cps,MINCPS);
5004 			cps = MINCPS;
5005 		}
5006 		if (cps > MAXCPS) {
5007 			fprintf(stderr,"CHUNKS_LOOP_MAX_CPS value too high (%"PRIu32") decreased to %u\n",cps,MAXCPS);
5008 			cps = MAXCPS;
5009 		}
5010 		HashCPTMax = ((cps+(TICKSPERSECOND-1))/TICKSPERSECOND);
5011 	}
5012 
5013 	chunk_hash_init();
5014 //	for (i=0 ; i<HASHSIZE ; i++) {
5015 //		chunkhash[i]=NULL;
5016 //	}
5017 	cstab = malloc(sizeof(csdata)*MAXCSCOUNT);
5018 	passert(cstab);
5019 	for (i=0 ; i<MAXCSCOUNT ; i++) {
5020 		cstab[i].next = i+1;
5021 		cstab[i].prev = i-1;
5022 		cstab[i].opchunks = NULL;
5023 		cstab[i].valid = 0;
5024 		cstab[i].registered = 0;
5025 		cstab[i].mfr_state = UNKNOWN_HARD;
5026 		cstab[i].newchunkdelay = 0;
5027 		cstab[i].lostchunkdelay = 0;
5028 	}
5029 	cstab[0].prev = MAXCSCOUNT;
5030 	csfreehead = 0;
5031 	csfreetail = MAXCSCOUNT-1;
5032 	csusedhead = MAXCSCOUNT;
5033 	allchunkcounts = malloc(sizeof(uint32_t*)*MAXSCLASS*2);
5034 	passert(allchunkcounts);
5035 	regularchunkcounts = malloc(sizeof(uint32_t*)*MAXSCLASS*2);
5036 	passert(regularchunkcounts);
5037 	for (i=0 ; i<MAXSCLASS*2 ; i++) {
5038 		allchunkcounts[i] = malloc(sizeof(uint32_t)*11);
5039 		passert(allchunkcounts[i]);
5040 		regularchunkcounts[i] = malloc(sizeof(uint32_t)*11);
5041 		passert(regularchunkcounts[i]);
5042 		for (j=0 ; j<11 ; j++) {
5043 			allchunkcounts[i][j]=0;
5044 			regularchunkcounts[i][j]=0;
5045 		}
5046 	}
5047 	jobshpos = 0;
5048 	jobshstep = 1;
5049 	jobshcnt = 0;
5050 	jobshmax = 0;
5051 	for (j=0 ; j<DANGER_PRIORITIES ; j++) {
5052 		chunks_priority_queue[j] = (chunk**)malloc(sizeof(chunk*)*DangerMaxLeng);
5053 		passert(chunks_priority_queue[j]);
5054 		for (i=0 ; i<DangerMaxLeng ; i++) {
5055 			chunks_priority_queue[j][i] = NULL;
5056 		}
5057 		chunks_priority_leng[j] = 0;
5058 		chunks_priority_head[j] = 0;
5059 		chunks_priority_tail[j] = 0;
5060 	}
5061 	chunk_do_jobs(NULL,JOBS_INIT,0,main_time(),0);	// clear chunk loop internal data, and allocate tabs
5062 	chunk_priority_queue_check(NULL,0); // allocate 'servers' tab
5063 
5064 #ifdef MFSDEBUG
5065 	mfsdebug(NULL);
5066 	main_time_register(1,0,mfsdebug_flush);
5067 #endif
5068 
5069 	main_reload_register(chunk_reload);
5070 	// main_time_register(1,0,chunk_jobs_main);
5071 	main_msectime_register(1000/TICKSPERSECOND,0,chunk_jobs_main);
5072 	main_destruct_register(chunk_term);
5073 	return 1;
5074 }
5075