1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <inttypes.h>
28 
29 #include "MFSCommunication.h"
30 
31 #include "csdb.h"
32 #include "bio.h"
33 #include "changelog.h"
34 #include "datapack.h"
35 #include "slogger.h"
36 #include "hashfn.h"
37 #include "massert.h"
38 #include "matocsserv.h"
39 #include "cfg.h"
40 #include "main.h"
41 #include "metadata.h"
42 
43 #define CSDB_OP_ADD 0
44 #define CSDB_OP_DEL 1
45 #define CSDB_OP_NEWIPPORT 2
46 #define CSDB_OP_NEWID 3
47 #define CSDB_OP_MAINTENANCEON 4
48 #define CSDB_OP_MAINTENANCEOFF 5
49 
50 static uint32_t HeavyLoadGracePeriod;
51 static uint32_t HeavyLoadThreshold;
52 static double HeavyLoadRatioThreshold;
53 
54 #define CSDBHASHSIZE 256
55 #define CSDBHASHFN(ip,port) (hash32((ip)^((port)<<16))%(CSDBHASHSIZE))
56 
57 typedef struct csdbentry {
58 	uint32_t ip;
59 	uint16_t port;
60 	uint16_t csid;
61 	uint16_t number;
62 	uint32_t heavyloadts;		// last timestamp of heavy load state (load > thresholds)
63 	uint32_t load;
64 	uint8_t maintenance;
65 	void *eptr;
66 	struct csdbentry *next;
67 } csdbentry;
68 
69 static csdbentry *csdbhash[CSDBHASHSIZE];
70 static csdbentry **csdbtab;
71 static uint32_t nextid;
72 static uint32_t disconnected_servers;
73 static uint32_t disconnected_servers_in_maintenance;
74 static uint32_t servers;
75 static uint32_t disconnecttime;
76 static uint32_t loadsum;
77 
csdb_disconnectcheck(void)78 void csdb_disconnectcheck(void) {
79 	static uint8_t laststate=0;
80 	if (disconnected_servers && laststate==0) {
81 		disconnecttime = main_time();
82 		laststate = 1;
83 	} else if (disconnected_servers==0) {
84 		disconnecttime = 0;
85 		laststate = 0;
86 	}
87 }
88 
csdb_newid(void)89 uint16_t csdb_newid(void) {
90 	while (nextid<65536 && csdbtab[nextid]!=NULL) {
91 		nextid++;
92 	}
93 	return nextid;
94 }
95 
csdb_delid(uint16_t csid)96 void csdb_delid(uint16_t csid) {
97 	csdbtab[csid] = NULL;
98 	if (csid<nextid) {
99 		nextid = csid;
100 	}
101 }
102 
csdb_makestrip(char strip[16],uint32_t ip)103 static inline void csdb_makestrip(char strip[16],uint32_t ip) {
104 	snprintf(strip,16,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,(uint8_t)(ip>>24),(uint8_t)(ip>>16),(uint8_t)(ip>>8),(uint8_t)ip);
105 	strip[15]=0;
106 }
107 
csdb_new_connection(uint32_t ip,uint16_t port,uint16_t csid,void * eptr)108 void* csdb_new_connection(uint32_t ip,uint16_t port,uint16_t csid,void *eptr) {
109 	uint32_t hash,hashid;
110 	csdbentry *csptr,**cspptr,*csidptr;
111 	char strip[16];
112 	char strtmpip[16];
113 
114 	csdb_makestrip(strip,ip);
115 	if (csid>0) {
116 		csidptr = csdbtab[csid];
117 	} else {
118 		csidptr = NULL;
119 	}
120 	if (csidptr && csidptr->ip == ip && csidptr->port == port) { // fast find using csid
121 		if (csidptr->eptr!=NULL) {
122 			syslog(LOG_NOTICE,"csdb: found cs using ip:port and csid (%s:%"PRIu16",%"PRIu16"), but server is still connected",strip,port,csid);
123 			return NULL;
124 		}
125 		csidptr->eptr = eptr;
126 		disconnected_servers--;
127 		if (csidptr->maintenance) {
128 			disconnected_servers_in_maintenance--;
129 		}
130 		syslog(LOG_NOTICE,"csdb: found cs using ip:port and csid (%s:%"PRIu16",%"PRIu16")",strip,port,csid);
131 		return csidptr;
132 	}
133 	hash = CSDBHASHFN(ip,port);
134 	for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) { // slow find using (ip+port)
135 		if (csptr->ip == ip && csptr->port == port) {
136 			if (csptr->eptr!=NULL) {
137 				syslog(LOG_NOTICE,"csdb: found cs using ip:port (%s:%"PRIu16",%"PRIu16"), but server is still connected",strip,port,csid);
138 					return NULL;
139 			}
140 			csptr->eptr = eptr;
141 			disconnected_servers--;
142 			if (csptr->maintenance) {
143 				disconnected_servers_in_maintenance--;
144 			}
145 			return csptr;
146 		}
147 	}
148 	if (csidptr && csidptr->eptr==NULL) { // ip+port not found, but found csid - change ip+port
149 		csdb_makestrip(strtmpip,csidptr->ip);
150 			syslog(LOG_NOTICE,"csdb: found cs using csid (%s:%"PRIu16",%"PRIu16") - previous ip:port (%s:%"PRIu16")",strip,port,csid,strtmpip,csidptr->port);
151 			hashid = CSDBHASHFN(csidptr->ip,csidptr->port);
152 			cspptr = csdbhash + hashid;
153 			while ((csptr=*cspptr)) {
154 				if (csptr == csidptr) {
155 					*cspptr = csptr->next;
156 					csptr->next = csdbhash[hash];
157 					csdbhash[hash] = csptr;
158 					break;
159 				} else {
160 					cspptr = &(csptr->next);
161 				}
162 			}
163 			csidptr->ip = ip;
164 			csidptr->port = port;
165 			changelog("%"PRIu32"|CSDBOP(%u,%"PRIu32",%"PRIu16",%"PRIu16")",main_time(),CSDB_OP_NEWIPPORT,ip,port,csidptr->csid);
166 		csidptr->eptr = eptr;
167 		disconnected_servers--;
168 		if (csidptr->maintenance) {
169 			disconnected_servers_in_maintenance--;
170 		}
171 		return csidptr;
172 	}
173 	syslog(LOG_NOTICE,"csdb: server not found (%s:%"PRIu16",%"PRIu16"), add it to database",strip,port,csid);
174 	csptr = malloc(sizeof(csdbentry));
175 	passert(csptr);
176 	csptr->ip = ip;
177 	csptr->port = port;
178 	if (csid>0) {
179 		if (csdbtab[csid]==NULL) {
180 			csdbtab[csid] = csptr;
181 		} else {
182 			csid = 0;
183 		}
184 	}
185 	csptr->csid = csid;
186 	csptr->heavyloadts = 0;
187 	csptr->maintenance = 0;
188 	csptr->load = 0;
189 	csptr->eptr = eptr;
190 	csptr->next = csdbhash[hash];
191 	csdbhash[hash] = csptr;
192 	servers++;
193 	changelog("%"PRIu32"|CSDBOP(%u,%"PRIu32",%"PRIu16",%"PRIu16")",main_time(),CSDB_OP_ADD,ip,port,csptr->csid);
194 	return csptr;
195 }
196 
csdb_lost_connection(void * v_csptr)197 void csdb_lost_connection(void *v_csptr) {
198 	csdbentry *csptr = (csdbentry*)v_csptr;
199 	if (csptr!=NULL) {
200 		csptr->eptr = NULL;
201 		disconnected_servers++;
202 		if (csptr->maintenance) {
203 			disconnected_servers_in_maintenance++;
204 		}
205 	}
206 	csdb_disconnectcheck();
207 }
208 
csdb_server_load(void * v_csptr,uint32_t load)209 void csdb_server_load(void *v_csptr,uint32_t load) {
210 	csdbentry *csptr = (csdbentry*)v_csptr;
211 	double loadavg;
212 	char strip[16];
213 	loadsum -= csptr->load;
214 	if (servers>1) {
215 		loadavg = loadsum / (servers-1);
216 	} else {
217 		loadavg = load;
218 	}
219 	csptr->load = load;
220 	loadsum += load;
221 	if (load>HeavyLoadThreshold && load>loadavg*HeavyLoadRatioThreshold) { // cs is in 'heavy load state'
222 		csdb_makestrip(strip,csptr->ip);
223 		syslog(LOG_NOTICE,"Heavy load server detected (%s:%u); load: %"PRIu32" ; threshold: %"PRIu32" ; loadavg (without this server): %.2lf ; ratio_threshold: %.2lf",strip,csptr->port,csptr->load,HeavyLoadThreshold,loadavg,HeavyLoadRatioThreshold);
224 		csptr->heavyloadts = main_time();
225 	}
226 }
227 
228 
csdb_get_csid(void * v_csptr)229 uint16_t csdb_get_csid(void *v_csptr) {
230 	csdbentry *csptr = (csdbentry*)v_csptr;
231 	char strip[16];
232 	if (csptr->csid==0) {
233 		csptr->csid = csdb_newid();
234 		csdbtab[csptr->csid] = csptr;
235 		changelog("%"PRIu32"|CSDBOP(%u,%"PRIu32",%"PRIu16",%"PRIu16")",main_time(),CSDB_OP_NEWID,csptr->ip,csptr->port,csptr->csid);
236 		csdb_makestrip(strip,csptr->ip);
237 		syslog(LOG_NOTICE,"csdb: generate new server id for (%s:%"PRIu16"): %"PRIu16,strip,csptr->port,csptr->csid);
238 	}
239 	return csptr->csid;
240 }
241 
csdb_server_is_overloaded(void * v_csptr,uint32_t now)242 uint8_t csdb_server_is_overloaded(void *v_csptr,uint32_t now) {
243 	csdbentry *csptr = (csdbentry*)v_csptr;
244 	return (csptr->heavyloadts+HeavyLoadGracePeriod<=now)?0:1;
245 }
246 
csdb_server_is_being_maintained(void * v_csptr)247 uint8_t csdb_server_is_being_maintained(void *v_csptr) {
248 	csdbentry *csptr = (csdbentry*)v_csptr;
249 	return csptr->maintenance;
250 }
251 
csdb_servlist_size(void)252 uint32_t csdb_servlist_size(void) {
253 	uint32_t hash;
254 	csdbentry *csptr;
255 	uint32_t i;
256 	i=0;
257 	for (hash=0 ; hash<CSDBHASHSIZE ; hash++) {
258 		for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
259 			i++;
260 		}
261 	}
262 	return i*(4+4+2+2+8+8+4+8+8+4+4+4+4);
263 }
264 
csdb_servlist_data(uint8_t * ptr)265 void csdb_servlist_data(uint8_t *ptr) {
266 	uint32_t hash;
267 	uint32_t now = main_time();
268 	uint32_t gracetime;
269 	uint8_t *p;
270 	csdbentry *csptr;
271 	for (hash=0 ; hash<CSDBHASHSIZE ; hash++) {
272 		for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
273 			if (csptr->heavyloadts+HeavyLoadGracePeriod>now) {
274 				gracetime = csptr->heavyloadts+HeavyLoadGracePeriod-now; // seconds to be turned back to work
275 			} else {
276 				gracetime = 0; // Server is working properly and never was in heavy load state
277 			}
278 			p = ptr;
279 			if (csptr->eptr) {
280 				uint32_t version,chunkscount,tdchunkscount,errorcounter,load;
281 				uint64_t usedspace,totalspace,tdusedspace,tdtotalspace;
282 				matocsserv_getservdata(csptr->eptr,&version,&usedspace,&totalspace,&chunkscount,&tdusedspace,&tdtotalspace,&tdchunkscount,&errorcounter,&load);
283 				put32bit(&ptr,version&0xFFFFFF);
284 				put32bit(&ptr,csptr->ip);
285 				put16bit(&ptr,csptr->port);
286 				put16bit(&ptr,csptr->csid);
287 				put64bit(&ptr,usedspace);
288 				put64bit(&ptr,totalspace);
289 				put32bit(&ptr,chunkscount);
290 				put64bit(&ptr,tdusedspace);
291 				put64bit(&ptr,tdtotalspace);
292 				put32bit(&ptr,tdchunkscount);
293 				put32bit(&ptr,errorcounter);
294 				put32bit(&ptr,load);
295 				put32bit(&ptr,gracetime);
296 			} else {
297 				put32bit(&ptr,0x01000000);
298 				put32bit(&ptr,csptr->ip);
299 				put16bit(&ptr,csptr->port);
300 				put16bit(&ptr,csptr->csid);
301 				put64bit(&ptr,0);
302 				put64bit(&ptr,0);
303 				put32bit(&ptr,0);
304 				put64bit(&ptr,0);
305 				put64bit(&ptr,0);
306 				put32bit(&ptr,0);
307 				put32bit(&ptr,0);
308 				put32bit(&ptr,0);
309 				put32bit(&ptr,gracetime);
310 			}
311 			if (csptr->maintenance) {
312 				*p |= 2;
313 			}
314 		}
315 	}
316 }
317 
csdb_remove_server(uint32_t ip,uint16_t port)318 uint8_t csdb_remove_server(uint32_t ip,uint16_t port) {
319 	uint32_t hash;
320 	csdbentry *csptr,**cspptr;
321 
322 	hash = CSDBHASHFN(ip,port);
323 	cspptr = csdbhash + hash;
324 	while ((csptr=*cspptr)) {
325 		if (csptr->ip == ip && csptr->port == port) {
326 			if (csptr->eptr!=NULL) {
327 				return ERROR_ACTIVE;
328 			}
329 			if (csptr->csid>0) {
330 				csdb_delid(csptr->csid);
331 			}
332 			if (csptr->maintenance) {
333 				disconnected_servers_in_maintenance--;
334 			}
335 			*cspptr = csptr->next;
336 			free(csptr);
337 			servers--;
338 			disconnected_servers--;
339 			changelog("%"PRIu32"|CSDBOP(%u,%"PRIu32",%"PRIu16",0)",main_time(),CSDB_OP_DEL,ip,port);
340 			return STATUS_OK;
341 		} else {
342 			cspptr = &(csptr->next);
343 		}
344 	}
345 	return ERROR_NOTFOUND;
346 }
347 
csdb_mr_op(uint8_t op,uint32_t ip,uint16_t port,uint16_t csid)348 uint8_t csdb_mr_op(uint8_t op,uint32_t ip,uint16_t port, uint16_t csid) {
349 	uint32_t hash,hashid;
350 	csdbentry *csptr,**cspptr,*csidptr;
351 
352 	switch (op) {
353 		case CSDB_OP_ADD:
354 			hash = CSDBHASHFN(ip,port);
355 			for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
356 				if (csptr->ip == ip && csptr->port == port) {
357 					return ERROR_MISMATCH;
358 				}
359 			}
360 			if (csid>0 && csdbtab[csid]!=NULL) {
361 				return ERROR_MISMATCH;
362 			}
363 			csptr = malloc(sizeof(csdbentry));
364 			passert(csptr);
365 			csptr->ip = ip;
366 			csptr->port = port;
367 			csptr->csid = csid;
368 			csdbtab[csid] = csptr;
369 			csptr->heavyloadts = 0;
370 			csptr->maintenance = 0;
371 			csptr->load = 0;
372 			csptr->eptr = NULL;
373 			csptr->next = csdbhash[hash];
374 			csdbhash[hash] = csptr;
375 			servers++;
376 			disconnected_servers++;
377 			meta_version_inc();
378 			return STATUS_OK;
379 		case CSDB_OP_DEL:
380 			hash = CSDBHASHFN(ip,port);
381 			cspptr = csdbhash + hash;
382 			while ((csptr=*cspptr)) {
383 				if (csptr->ip == ip && csptr->port == port) {
384 					if (csptr->eptr!=NULL) {
385 						return ERROR_MISMATCH;
386 					}
387 					if (csptr->csid>0) {
388 						csdb_delid(csptr->csid);
389 					}
390 					if (csptr->maintenance) {
391 						disconnected_servers_in_maintenance--;
392 					}
393 					*cspptr = csptr->next;
394 					free(csptr);
395 					servers--;
396 					disconnected_servers--;
397 					meta_version_inc();
398 					return STATUS_OK;
399 				} else {
400 					cspptr = &(csptr->next);
401 				}
402 			}
403 			return ERROR_MISMATCH;
404 		case CSDB_OP_NEWIPPORT:
405 			if (csid==0 || csdbtab[csid]==NULL) {
406 				return ERROR_MISMATCH;
407 			}
408 			csidptr = csdbtab[csid];
409 
410 			hashid = CSDBHASHFN(csidptr->ip,csidptr->port);
411 			hash = CSDBHASHFN(ip,port);
412 			cspptr = csdbhash + hashid;
413 			while ((csptr=*cspptr)) {
414 				if (csptr == csidptr) {
415 					*cspptr = csptr->next;
416 					csptr->next = csdbhash[hash];
417 					csdbhash[hash] = csptr;
418 					break;
419 				} else {
420 					cspptr = &(csptr->next);
421 				}
422 			}
423 			if (csptr==NULL) {
424 				return ERROR_MISMATCH;
425 			}
426 			csptr->ip = ip;
427 			csptr->port = port;
428 			meta_version_inc();
429 			return STATUS_OK;
430 		case CSDB_OP_NEWID:
431 			if (csid==0 || csdbtab[csid]!=NULL) {
432 				return ERROR_MISMATCH;
433 			}
434 			hash = CSDBHASHFN(ip,port);
435 			for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
436 				if (csptr->ip == ip && csptr->port == port) {
437 					if (csptr->csid!=csid) {
438 						if (csptr->csid>0) {
439 							csdb_delid(csptr->csid);
440 						}
441 						csptr->csid = csid;
442 						csdbtab[csid] = csptr;
443 					}
444 					meta_version_inc();
445 					return STATUS_OK;
446 				}
447 			}
448 			return ERROR_MISMATCH;
449 		case CSDB_OP_MAINTENANCEON:
450 			hash = CSDBHASHFN(ip,port);
451 			for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
452 				if (csptr->ip == ip && csptr->port == port) {
453 					if (csptr->maintenance!=0) {
454 						return ERROR_MISMATCH;
455 					}
456 					csptr->maintenance = 1;
457 					if (csptr->eptr==NULL) {
458 						disconnected_servers_in_maintenance++;
459 					}
460 					meta_version_inc();
461 					return STATUS_OK;
462 				}
463 			}
464 			return ERROR_MISMATCH;
465 		case CSDB_OP_MAINTENANCEOFF:
466 			hash = CSDBHASHFN(ip,port);
467 			for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
468 				if (csptr->ip == ip && csptr->port == port) {
469 					if (csptr->maintenance!=1) {
470 						return ERROR_MISMATCH;
471 					}
472 					csptr->maintenance = 0;
473 					if (csptr->eptr==NULL) {
474 						disconnected_servers_in_maintenance--;
475 					}
476 					meta_version_inc();
477 					return STATUS_OK;
478 				}
479 			}
480 			return ERROR_MISMATCH;
481 	}
482 	return ERROR_MISMATCH;
483 }
484 
csdb_back_to_work(uint32_t ip,uint16_t port)485 uint8_t csdb_back_to_work(uint32_t ip,uint16_t port) {
486 	uint32_t hash;
487 	csdbentry *csptr;
488 
489 	hash = CSDBHASHFN(ip,port);
490 	for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
491 		if (csptr->ip == ip && csptr->port == port) {
492 			csptr->heavyloadts = 0;
493 			return STATUS_OK;
494 		}
495 	}
496 	return ERROR_NOTFOUND;
497 }
498 
csdb_maintenance(uint32_t ip,uint16_t port,uint8_t onoff)499 uint8_t csdb_maintenance(uint32_t ip,uint16_t port,uint8_t onoff) {
500 	uint32_t hash;
501 	csdbentry *csptr;
502 
503 	if (onoff!=0 && onoff!=1) {
504 		return ERROR_EINVAL;
505 	}
506 	hash = CSDBHASHFN(ip,port);
507 	for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
508 		if (csptr->ip == ip && csptr->port == port) {
509 			if (csptr->maintenance!=onoff) {
510 				csptr->maintenance = onoff;
511 				if (onoff) {
512 					changelog("%"PRIu32"|CSDBOP(%u,%"PRIu32",%"PRIu16",0)",main_time(),CSDB_OP_MAINTENANCEON,ip,port);
513 				} else {
514 					changelog("%"PRIu32"|CSDBOP(%u,%"PRIu32",%"PRIu16",0)",main_time(),CSDB_OP_MAINTENANCEOFF,ip,port);
515 				}
516 				if (csptr->eptr==NULL) {
517 					if (onoff) {
518 						disconnected_servers_in_maintenance++;
519 					} else {
520 						disconnected_servers_in_maintenance--;
521 					}
522 				}
523 			}
524 			return STATUS_OK;
525 		}
526 	}
527 	return ERROR_NOTFOUND;
528 }
529 
530 /*
531 uint8_t csdb_find(uint32_t ip,uint16_t port,uint16_t csid) {
532 	uint32_t hash;
533 	csdbentry *csptr;
534 
535 	hash = CSDBHASHFN(ip,port);
536 	for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
537 		if (csptr->ip == ip && csptr->port == port) {
538 			return 1;
539 		}
540 	}
541 	if (csid>0 && csdbtab[csid]!=NULL) {
542 		return 2;
543 	}
544 	return 0;
545 }
546 */
547 
csdb_have_all_servers(void)548 uint8_t csdb_have_all_servers(void) {
549 	return (disconnected_servers>0)?0:1;
550 }
551 
csdb_have_more_than_half_servers(void)552 uint8_t csdb_have_more_than_half_servers(void) {
553 	return ((servers==0)||(disconnected_servers<((servers+1)/2)))?1:0;
554 }
555 
csdb_replicate_undergoals(void)556 uint8_t csdb_replicate_undergoals(void) {
557 	return (disconnected_servers>0 && disconnected_servers==disconnected_servers_in_maintenance)?0:1;
558 }
559 
csdb_compare(const void * a,const void * b)560 int csdb_compare(const void *a,const void *b) {
561 	const csdbentry *aa = *((const csdbentry**)a);
562 	const csdbentry *bb = *((const csdbentry**)b);
563 	if (aa->ip < bb->ip) {
564 		return -1;
565 	} else if (aa->ip > bb->ip) {
566 		return 1;
567 	} else if (aa->port < bb->port) {
568 		return -1;
569 	} else if (aa->port > bb->port) {
570 		return 1;
571 	}
572 	return 0;
573 }
574 
csdb_sort_servers(void)575 uint16_t csdb_sort_servers(void) {
576 	csdbentry **stab,*csptr;
577 	uint32_t i,hash;
578 
579 	stab = malloc(sizeof(csdbentry*)*servers);
580 	i = 0;
581 	for (hash=0 ; hash<CSDBHASHSIZE ; hash++) {
582 		for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
583 			if (i<servers) {
584 				stab[i] = csptr;
585 				i++;
586 			} else {
587 				syslog(LOG_WARNING,"internal error: wrong chunk servers count !!!");
588 				csptr->number = 0;
589 			}
590 		}
591 	}
592 	qsort(stab,servers,sizeof(csdbentry*),csdb_compare);
593 	for (i=0 ; i<servers ; i++) {
594 		stab[i]->number = i+1;
595 	}
596 	free(stab);
597 
598 	return servers;
599 }
600 
csdb_getnumber(void * v_csptr)601 uint16_t csdb_getnumber(void *v_csptr) {
602 	csdbentry *csptr = (csdbentry*)v_csptr;
603 	if (csptr!=NULL) {
604 		return csptr->number;
605 	}
606 	return 0;
607 }
608 
csdb_store(bio * fd)609 uint8_t csdb_store(bio *fd) {
610 	uint32_t hash;
611 	uint8_t wbuff[9*100],*ptr;
612 	csdbentry *csptr;
613 	uint32_t l;
614 	l=0;
615 
616 	if (fd==NULL) {
617 		return 0x12;
618 	}
619 	for (hash=0 ; hash<CSDBHASHSIZE ; hash++) {
620 		for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
621 			l++;
622 		}
623 	}
624 	ptr = wbuff;
625 	put32bit(&ptr,l);
626 	if (bio_write(fd,wbuff,4)!=4) {
627 		syslog(LOG_NOTICE,"write error");
628 		return 0xFF;
629 	}
630 	l=0;
631 	ptr=wbuff;
632 	for (hash=0 ; hash<CSDBHASHSIZE ; hash++) {
633 		for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
634 			if (l==100) {
635 				if (bio_write(fd,wbuff,9*100)!=(9*100)) {
636 					syslog(LOG_NOTICE,"write error");
637 					return 0xFF;
638 				}
639 				l=0;
640 				ptr=wbuff;
641 			}
642 			put32bit(&ptr,csptr->ip);
643 			put16bit(&ptr,csptr->port);
644 			put16bit(&ptr,csptr->csid);
645 			put8bit(&ptr,csptr->maintenance);
646 			l++;
647 		}
648 	}
649 	if (l>0) {
650 		if (bio_write(fd,wbuff,9*l)!=(9*l)) {
651 			syslog(LOG_NOTICE,"write error");
652 			return 0xFF;
653 		}
654 	}
655 	return 0;
656 }
657 
csdb_load(bio * fd,uint8_t mver,int ignoreflag)658 int csdb_load(bio *fd,uint8_t mver,int ignoreflag) {
659 	uint8_t rbuff[9*100];
660 	const uint8_t *ptr;
661 	csdbentry *csptr;
662 	uint32_t hash;
663 	uint32_t l,t,ip;
664 	uint16_t port,csid;
665 	uint8_t maintenance;
666 	uint8_t nl=1;
667 	uint32_t bsize;
668 
669 	if (bio_read(fd,rbuff,4)!=4) {
670 		int err = errno;
671 		if (nl) {
672 			fputc('\n',stderr);
673 			// nl=0;
674 		}
675 		errno = err;
676 		mfs_errlog(LOG_ERR,"loading chunkservers: read error");
677 		return -1;
678 	}
679 	ptr=rbuff;
680 	t = get32bit(&ptr);
681 	if (mver<=0x10) {
682 		bsize = 6;
683 	} else if (mver<=0x11) {
684 		bsize = 8;
685 	} else {
686 		bsize = 9;
687 	}
688 	l=0;
689 	while (t>0) {
690 		if (l==0) {
691 			if (t>100) {
692 				if (bio_read(fd,rbuff,bsize*100)!=(bsize*100)) {
693 					int err = errno;
694 					if (nl) {
695 						fputc('\n',stderr);
696 						// nl=0;
697 					}
698 					errno = err;
699 					mfs_errlog(LOG_ERR,"loading chunkservers: read error");
700 					return -1;
701 				}
702 				l=100;
703 			} else {
704 				if (bio_read(fd,rbuff,bsize*t)!=(bsize*t)) {
705 					int err = errno;
706 					if (nl) {
707 						fputc('\n',stderr);
708 						// nl=0;
709 					}
710 					errno = err;
711 					mfs_errlog(LOG_ERR,"loading free nodes: read error");
712 					return -1;
713 				}
714 				l=t;
715 			}
716 			ptr = rbuff;
717 		}
718 		ip = get32bit(&ptr);
719 		port = get16bit(&ptr);
720 		if (mver>=0x11) {
721 			csid = get16bit(&ptr);
722 		} else {
723 			csid = 0;
724 		}
725 		if (mver>=0x12) {
726 			maintenance = get8bit(&ptr);
727 		} else {
728 			maintenance = 0;
729 		}
730 		hash = CSDBHASHFN(ip,port);
731 		for (csptr = csdbhash[hash] ; csptr ; csptr = csptr->next) {
732 			if (csptr->ip == ip && csptr->port == port) {
733 				if (nl) {
734 					fputc('\n',stderr);
735 					nl=0;
736 				}
737 				fprintf(stderr,"repeated chunkserver entry (ip:%"PRIu32",port:%"PRIu16")\n",ip,port);
738 				syslog(LOG_ERR,"repeated chunkserver entry (ip:%"PRIu32",port:%"PRIu16")",ip,port);
739 				if (ignoreflag==0) {
740 					fprintf(stderr,"use '-i' option to remove this chunkserver definition");
741 					return -1;
742 				}
743 			}
744 		}
745 		if (csid>0) {
746 			csptr = csdbtab[csid];
747 			if (csptr!=NULL) {
748 				if (nl) {
749 					fputc('\n',stderr);
750 					nl=0;
751 				}
752 				fprintf(stderr,"repeated chunkserver entry (csid:%"PRIu16")\n",csid);
753 				syslog(LOG_ERR,"repeated chunkserver entry (csid:%"PRIu16")",csid);
754 				if (ignoreflag==0) {
755 					fprintf(stderr,"use '-i' option to remove this chunkserver definition");
756 					return -1;
757 				}
758 			}
759 		}
760 		csptr = malloc(sizeof(csdbentry));
761 		passert(csptr);
762 		csptr->ip = ip;
763 		csptr->port = port;
764 		csptr->csid = csid;
765 		if (csid>0) {
766 			csdbtab[csid] = csptr;
767 		}
768 		csptr->number = 0;
769 		csptr->heavyloadts = 0;
770 		csptr->load = 0;
771 		csptr->eptr = NULL;
772 		csptr->maintenance = maintenance;
773 		csptr->next = csdbhash[hash];
774 		csdbhash[hash] = csptr;
775 		servers++;
776 		disconnected_servers++;
777 		if (maintenance) {
778 			disconnected_servers_in_maintenance++;
779 		}
780 		l--;
781 		t--;
782 	}
783 	return 0;
784 }
785 
csdb_cleanup(void)786 void csdb_cleanup(void) {
787 	uint32_t hash;
788 	csdbentry *csptr,*csnptr;
789 
790 	for (hash=0 ; hash<CSDBHASHSIZE ; hash++) {
791 		csptr = csdbhash[hash];
792 		while (csptr) {
793 			csnptr = csptr->next;
794 			free(csptr);
795 			csptr = csnptr;
796 		}
797 		csdbhash[hash]=NULL;
798 	}
799 	for (hash=0 ; hash<65536 ; hash++) {
800 		csdbtab[hash] = NULL;
801 	}
802 	nextid = 1;
803 	disconnected_servers = 0;
804 	disconnected_servers_in_maintenance = 0;
805 	servers = 0;
806 }
807 
csdb_getdisconnecttime(void)808 uint32_t csdb_getdisconnecttime(void) {
809 	return disconnecttime;
810 }
811 
csdb_reload(void)812 void csdb_reload(void) {
813 	HeavyLoadGracePeriod = cfg_getuint32("CS_HEAVY_LOAD_GRACE_PERIOD",900);
814 	HeavyLoadThreshold = cfg_getuint32("CS_HEAVY_LOAD_THRESHOLD",150);
815 	HeavyLoadRatioThreshold = cfg_getdouble("CS_HEAVY_LOAD_RATIO_THRESHOLD",3.0);
816 }
817 
csdb_init(void)818 int csdb_init(void) {
819 	uint32_t hash;
820 	csdb_reload();
821 	for (hash=0 ; hash<CSDBHASHSIZE ; hash++) {
822 		csdbhash[hash]=NULL;
823 	}
824 	csdbtab = malloc(sizeof(csdbentry*)*65536);
825 	passert(csdbtab);
826 	for (hash=0 ; hash<65536 ; hash++) {
827 		csdbtab[hash] = NULL;
828 	}
829 	nextid = 1;
830 	disconnected_servers = 0;
831 	disconnected_servers_in_maintenance = 0;
832 	servers = 0;
833 	disconnecttime = 0;
834 	loadsum = 0;
835 	main_reload_register(csdb_reload);
836 	main_time_register(1,0,csdb_disconnectcheck);
837 	return 0;
838 }
839