1 /**
2 * Copyright (C) 2008 Happy Fish / YuQing
3 *
4 * FastDFS may be copied only under the terms of the GNU General
5 * Public License V3, which may be found in the FastDFS source kit.
6 * Please visit the FastDFS Home Page http://www.fastken.com/ for more detail.
7 **/
8 
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <unistd.h>
13 #include <string.h>
14 #include <errno.h>
15 #include <sys/types.h>
16 #include "fastcommon/sockopt.h"
17 #include "fastcommon/logger.h"
18 #include "fastcommon/hash.h"
19 #include "fastcommon/shared_func.h"
20 #include "fastcommon/ini_file_reader.h"
21 #include "fdht_types.h"
22 #include "fdht_proto.h"
23 #include "fdht_global.h"
24 #include "fdht_client.h"
25 
26 GroupArray g_group_array = {NULL, 0};
27 bool g_keep_alive = false;
28 
fdht_proxy_extra_deal(GroupArray * pGroupArray,bool * bKeepAlive)29 static void fdht_proxy_extra_deal(GroupArray *pGroupArray, bool *bKeepAlive)
30 {
31 	int group_id;
32 	ServerArray *pServerArray;
33 	FDHTServerInfo **ppServer;
34 	FDHTServerInfo **ppServerEnd;
35 
36 	if (!pGroupArray->use_proxy)
37 	{
38 		return;
39 	}
40 
41 	*bKeepAlive = true;
42 	pGroupArray->server_count = 1;
43 	memcpy(pGroupArray->servers, &pGroupArray->proxy_server, \
44 			sizeof(FDHTServerInfo));
45 
46 	pServerArray = pGroupArray->groups;
47 	for (group_id=0; group_id<pGroupArray->group_count; group_id++)
48 	{
49 		ppServerEnd = pServerArray->servers + pServerArray->count;
50 		for (ppServer=pServerArray->servers; \
51 				ppServer<ppServerEnd; ppServer++)
52 		{
53 			*ppServer = pGroupArray->servers;
54 		}
55 
56 		pServerArray++;
57 	}
58 }
59 
fdht_client_init(const char * filename)60 int fdht_client_init(const char *filename)
61 {
62 	char *pBasePath;
63 	IniContext iniContext;
64 	char szProxyPrompt[64];
65 	int result;
66 
67 	memset(&iniContext, 0, sizeof(IniContext));
68 	if ((result=iniLoadFromFile(filename, &iniContext)) != 0)
69 	{
70 		logError("load conf file \"%s\" fail, ret code: %d", \
71 			filename, result);
72 		return result;
73 	}
74 
75 	//iniPrintItems(&iniContext);
76 
77 	while (1)
78 	{
79 		pBasePath = iniGetStrValue(NULL, "base_path", &iniContext);
80 		if (pBasePath == NULL)
81 		{
82 			logError("conf file \"%s\" must have item " \
83 				"\"base_path\"!", filename);
84 			result = ENOENT;
85 			break;
86 		}
87 
88 		snprintf(g_fdht_base_path, sizeof(g_fdht_base_path), "%s", pBasePath);
89 		chopPath(g_fdht_base_path);
90 		if (!fileExists(g_fdht_base_path))
91 		{
92 			logError("\"%s\" can't be accessed, error info: %s", \
93 				g_fdht_base_path, STRERROR(errno));
94 			result = errno != 0 ? errno : ENOENT;
95 			break;
96 		}
97 		if (!isDir(g_fdht_base_path))
98 		{
99 			logError("\"%s\" is not a directory!", g_fdht_base_path);
100 			result = ENOTDIR;
101 			break;
102 		}
103 
104 		g_fdht_connect_timeout = iniGetIntValue(NULL, "connect_timeout", \
105 				&iniContext, DEFAULT_CONNECT_TIMEOUT);
106 		if (g_fdht_connect_timeout <= 0)
107 		{
108 			g_fdht_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
109 		}
110 
111 		g_fdht_network_timeout = iniGetIntValue(NULL, "network_timeout", \
112 				&iniContext, DEFAULT_NETWORK_TIMEOUT);
113 		if (g_fdht_network_timeout <= 0)
114 		{
115 			g_fdht_network_timeout = DEFAULT_NETWORK_TIMEOUT;
116 		}
117 
118 		g_keep_alive = iniGetBoolValue(NULL, "keep_alive", \
119 				&iniContext, false);
120 
121 		if ((result=fdht_load_groups(&iniContext, \
122 				&g_group_array)) != 0)
123 		{
124 			break;
125 		}
126 
127 		if (g_group_array.use_proxy)
128 		{
129 			sprintf(szProxyPrompt, "proxy_addr=%s, proxy_port=%d, ",
130 				g_group_array.proxy_server.ip_addr,
131 				g_group_array.proxy_server.port);
132 		}
133 		else
134 		{
135 			*szProxyPrompt = '\0';
136 		}
137 
138 		load_log_level(&iniContext);
139 
140 		logInfo("file: "__FILE__", line: %d, " \
141 			"base_path=%s, " \
142 			"connect_timeout=%ds, network_timeout=%ds, " \
143 			"keep_alive=%d, use_proxy=%d, %s"\
144 			"group_count=%d, server_count=%d", __LINE__, \
145 			g_fdht_base_path, g_fdht_connect_timeout, \
146 			g_fdht_network_timeout, g_keep_alive, \
147 			g_group_array.use_proxy, szProxyPrompt, \
148 			g_group_array.group_count, g_group_array.server_count);
149 
150 		fdht_proxy_extra_deal(&g_group_array, &g_keep_alive);
151 
152 		break;
153 	}
154 
155 	iniFreeContext(&iniContext);
156 
157 	return result;
158 }
159 
fdht_load_conf(const char * filename,GroupArray * pGroupArray,bool * bKeepAlive)160 int fdht_load_conf(const char *filename, GroupArray *pGroupArray, \
161 		bool *bKeepAlive)
162 {
163 	IniContext iniContext;
164 	int result;
165 
166 	if ((result=iniLoadFromFile(filename, &iniContext)) != 0)
167 	{
168 		logError("file: "__FILE__", line: %d, " \
169 			"load conf file \"%s\" fail, " \
170 			"ret code: %d", __LINE__, \
171 			filename, result);
172 		return result;
173 	}
174 
175 	*bKeepAlive = iniGetBoolValue(NULL, "keep_alive", &iniContext, false);
176 	if ((result=fdht_load_groups(&iniContext, pGroupArray)) != 0)
177 	{
178 		iniFreeContext(&iniContext);
179 		return result;
180 	}
181 
182 	fdht_proxy_extra_deal(pGroupArray, bKeepAlive);
183 
184 	iniFreeContext(&iniContext);
185 	return 0;
186 }
187 
fdht_client_destroy()188 void fdht_client_destroy()
189 {
190 	fdht_free_group_array(&g_group_array);
191 }
192 
193 #define get_readable_connection(pServerArray, bKeepAlive, hash_code, err_no) \
194 	  get_connection(pServerArray, bKeepAlive, hash_code, err_no)
195 
196 #define get_writable_connection(pServerArray, bKeepAlive, hash_code, err_no) \
197 	  get_connection(pServerArray, bKeepAlive, hash_code, err_no)
198 
get_connection(ServerArray * pServerArray,const bool bKeepAlive,const int hash_code,int * err_no)199 static FDHTServerInfo *get_connection(ServerArray *pServerArray, \
200 		const bool bKeepAlive, const int hash_code, int *err_no)
201 {
202 	FDHTServerInfo **ppServer;
203 	FDHTServerInfo **ppEnd;
204 	int server_index;
205 	int new_hash_code;
206 
207 	new_hash_code = (hash_code << 16) | (hash_code >> 16);
208 	if (new_hash_code < 0)
209 	{
210 		new_hash_code &= 0x7FFFFFFF;
211 	}
212 	server_index = new_hash_code % pServerArray->count;
213 	ppEnd = pServerArray->servers + pServerArray->count;
214 	for (ppServer = pServerArray->servers + server_index; \
215 		ppServer<ppEnd; ppServer++)
216 	{
217 		if ((*ppServer)->sock > 0)  //already connected
218 		{
219 			return *ppServer;
220 		}
221 
222 		if (fdht_connect_server_nb(*ppServer, \
223 			g_fdht_connect_timeout) == 0)
224 		{
225 			if (bKeepAlive)
226 			{
227 				tcpsetnodelay((*ppServer)->sock, 3600);
228 			}
229 			return *ppServer;
230 		}
231 	}
232 
233 	ppEnd = pServerArray->servers + server_index;
234 	for (ppServer = pServerArray->servers; ppServer<ppEnd; ppServer++)
235 	{
236 		if ((*ppServer)->sock > 0)  //already connected
237 		{
238 			return *ppServer;
239 		}
240 
241 		if (fdht_connect_server_nb(*ppServer, \
242 			g_fdht_connect_timeout) == 0)
243 		{
244 			if (bKeepAlive)
245 			{
246 				tcpsetnodelay((*ppServer)->sock, 3600);
247 			}
248 			return *ppServer;
249 		}
250 	}
251 
252 	*err_no = ENOENT;
253 	return NULL;
254 }
255 
256 #define CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code) \
257 	if (pKeyInfo->namespace_len > FDHT_MAX_NAMESPACE_LEN) \
258 	{ \
259 		fprintf(stderr, "namespace length: %d exceeds, " \
260 			"max length:  %d\n", \
261 			pKeyInfo->namespace_len, FDHT_MAX_NAMESPACE_LEN); \
262 		return EINVAL; \
263 	} \
264  \
265 	if (pKeyInfo->obj_id_len > FDHT_MAX_OBJECT_ID_LEN) \
266 	{ \
267 		fprintf(stderr, "object ID length: %d exceeds, " \
268 			"max length: %d\n", \
269 			pKeyInfo->obj_id_len, FDHT_MAX_OBJECT_ID_LEN); \
270 		return EINVAL; \
271 	} \
272  \
273 	if (pKeyInfo->key_len > FDHT_MAX_SUB_KEY_LEN) \
274 	{ \
275 		fprintf(stderr, "key length: %d exceeds, max length: %d\n", \
276 			pKeyInfo->key_len, FDHT_MAX_SUB_KEY_LEN); \
277 		return EINVAL; \
278 	} \
279  \
280 	if (pKeyInfo->namespace_len == 0 && pKeyInfo->obj_id_len == 0) \
281 	{ \
282 		hash_key_len = pKeyInfo->key_len; \
283 		memcpy(hash_key, pKeyInfo->szKey, pKeyInfo->key_len); \
284 	} \
285 	else if (pKeyInfo->namespace_len > 0 && pKeyInfo->obj_id_len > 0) \
286 	{ \
287 		hash_key_len = pKeyInfo->namespace_len+1+pKeyInfo->obj_id_len; \
288 		memcpy(hash_key,pKeyInfo->szNameSpace,pKeyInfo->namespace_len);\
289 		*(hash_key + pKeyInfo->namespace_len)=FDHT_FULL_KEY_SEPERATOR; \
290 		memcpy(hash_key + pKeyInfo->namespace_len + 1, \
291 			pKeyInfo->szObjectId, pKeyInfo->obj_id_len); \
292 	} \
293 	else \
294 	{ \
295 		fprintf(stderr, "invalid namespace length: %d and " \
296 				"object ID length: %d\n", \
297 				pKeyInfo->namespace_len, pKeyInfo->obj_id_len); \
298 		return EINVAL; \
299 	} \
300  \
301 	key_hash_code = PJWHash(hash_key, hash_key_len); \
302 	if (key_hash_code < 0) \
303 	{ \
304 		key_hash_code &= 0x7FFFFFFF; \
305 	} \
306 
307 
308 #define CALC_OBJECT_HASH_CODE(pObjectInfo, hash_key, hash_key_len, key_hash_code) \
309 	if (pObjectInfo->namespace_len <= 0 || pObjectInfo->obj_id_len <= 0) \
310 	{ \
311 		fprintf(stderr, "invalid namespace length: %d and " \
312 			"object ID length: %d\n", \
313 			pObjectInfo->namespace_len, pObjectInfo->obj_id_len); \
314 		return EINVAL; \
315 	} \
316  \
317 	if (pObjectInfo->namespace_len > FDHT_MAX_NAMESPACE_LEN) \
318 	{ \
319 		fprintf(stderr, "namespace length: %d exceeds, " \
320 			"max length:  %d\n", \
321 			pObjectInfo->namespace_len, FDHT_MAX_NAMESPACE_LEN); \
322 		return EINVAL; \
323 	} \
324  \
325 	if (pObjectInfo->obj_id_len > FDHT_MAX_OBJECT_ID_LEN) \
326 	{ \
327 		fprintf(stderr, "object ID length: %d exceeds, " \
328 			"max length: %d\n", \
329 			pObjectInfo->obj_id_len, FDHT_MAX_OBJECT_ID_LEN); \
330 		return EINVAL; \
331 	} \
332 	hash_key_len = pObjectInfo->namespace_len+1+pObjectInfo->obj_id_len; \
333 	memcpy(hash_key, pObjectInfo->szNameSpace, pObjectInfo->namespace_len);\
334 	*(hash_key + pObjectInfo->namespace_len) = FDHT_FULL_KEY_SEPERATOR; \
335 	memcpy(hash_key + pObjectInfo->namespace_len + 1, \
336 		pObjectInfo->szObjectId, pObjectInfo->obj_id_len); \
337  \
338 	key_hash_code = PJWHash(hash_key, hash_key_len); \
339 	if (key_hash_code < 0) \
340 	{ \
341 		key_hash_code &= 0x7FFFFFFF; \
342 	} \
343 
344 
345 /**
346 * request body format:
347 *       namespace_len:  4 bytes big endian integer
348 *       namespace: can be emtpy
349 *       obj_id_len:  4 bytes big endian integer
350 *       object_id: the object id (can be empty)
351 *       key_len:  4 bytes big endian integer
352 *       key:      key name
353 * response body format:
354 *       value_len:  4 bytes big endian integer
355 *       value:      value buff
356 */
fdht_get_ex1(GroupArray * pGroupArray,const bool bKeepAlive,FDHTKeyInfo * pKeyInfo,const time_t expires,char ** ppValue,int * value_len,MallocFunc malloc_func)357 int fdht_get_ex1(GroupArray *pGroupArray, const bool bKeepAlive, \
358 		FDHTKeyInfo *pKeyInfo, const time_t expires, \
359 		char **ppValue, int *value_len, MallocFunc malloc_func)
360 {
361 	int result;
362 	FDHTProtoHeader *pHeader;
363 	char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
364 	char buff[sizeof(FDHTProtoHeader) + FDHT_MAX_FULL_KEY_LEN + 16];
365 	int in_bytes;
366 	int vlen;
367 	int group_id;
368 	int hash_key_len;
369 	int key_hash_code;
370 	int i;
371 	ServerArray *pGroup;
372 	FDHTServerInfo *pServer;
373 	char *p;
374 
375 	CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code)
376 	group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
377 	//printf("get group_id=%d\n", group_id);
378 
379 	pGroup = pGroupArray->groups + group_id;
380 	for (i=0; i<=pGroup->count; i++)
381 	{
382 	pServer = get_readable_connection(pGroup, bKeepAlive, \
383 			key_hash_code, &result);
384 	if (pServer == NULL)
385 	{
386 		return result;
387 	}
388 
389 	memset(buff, 0, sizeof(buff));
390 	pHeader = (FDHTProtoHeader *)buff;
391 
392 	pHeader->cmd = FDHT_PROTO_CMD_GET;
393 	pHeader->keep_alive = bKeepAlive;
394 	int2buff((int)time(NULL), pHeader->timestamp);
395 	int2buff((int)expires, pHeader->expires);
396 	int2buff(key_hash_code, pHeader->key_hash_code);
397 	int2buff(12 + pKeyInfo->namespace_len + pKeyInfo->obj_id_len + \
398 		pKeyInfo->key_len, pHeader->pkg_len);
399 
400 	do
401 	{
402 		p = buff + sizeof(FDHTProtoHeader);
403 		PACK_BODY_UNTIL_KEY(pKeyInfo, p)
404 		if ((result=tcpsenddata_nb(pServer->sock, buff, p - buff, \
405 			g_fdht_network_timeout)) != 0)
406 		{
407 			logError("send data to server %s:%d fail, " \
408 				"errno: %d, error info: %s", \
409 				pServer->ip_addr, pServer->port, \
410 				result, STRERROR(result));
411 			break;
412 		}
413 
414 		if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
415 		{
416 			break;
417 		}
418 
419 		if (in_bytes < 4)
420 		{
421 			logError("server %s:%d reponse bytes: %d < 4", \
422 				pServer->ip_addr, pServer->port, in_bytes);
423 			result = EINVAL;
424 			break;
425 		}
426 
427 		if ((result=tcprecvdata_nb(pServer->sock, buff, \
428 			4, g_fdht_network_timeout)) != 0)
429 		{
430 			logError("file: "__FILE__", line: %d, " \
431 				"server: %s:%d, recv data fail, " \
432 				"errno: %d, error info: %s", \
433 				__LINE__, pServer->ip_addr, \
434 				pServer->port, \
435 				result, STRERROR(result));
436 			break;
437 		}
438 
439 		vlen = buff2int(buff);
440 		if (vlen != in_bytes - 4)
441 		{
442 			logError("server %s:%d reponse bytes: %d " \
443 				"is not correct, %d != %d", pServer->ip_addr, \
444 				pServer->port, in_bytes, vlen, in_bytes - 4);
445 			result = EINVAL;
446 			break;
447 		}
448 
449 		if (*ppValue != NULL)
450 		{
451 			if (vlen >= *value_len)
452 			{
453 				*value_len = 0;
454 				result = ENOSPC;
455 				break;
456 			}
457 
458 			*value_len = vlen;
459 		}
460 		else
461 		{
462 			*value_len = vlen;
463 			*ppValue = (char *)malloc_func((*value_len + 1));
464 			if (*ppValue == NULL)
465 			{
466 				*value_len = 0;
467 				logError("malloc %d bytes fail, " \
468 					"errno: %d, error info: %s", \
469 					*value_len + 1, errno, STRERROR(errno));
470 				result = errno != 0 ? errno : ENOMEM;
471 				break;
472 			}
473 		}
474 
475 		if ((result=tcprecvdata_nb(pServer->sock, *ppValue, \
476 			*value_len, g_fdht_network_timeout)) != 0)
477 		{
478 			logError("file: "__FILE__", line: %d, " \
479 				"server: %s:%d, recv data fail, " \
480 				"errno: %d, error info: %s", \
481 				__LINE__, pServer->ip_addr, \
482 				pServer->port, \
483 				result, STRERROR(result));
484 			break;
485 		}
486 
487 		*(*ppValue + *value_len) = '\0';
488 	} while(0);
489 
490 	if (bKeepAlive)
491 	{
492 		if (result >= ENETDOWN) //network error
493 		{
494 			fdht_disconnect_server(pServer);
495 			if (result == ENOTCONN)
496 			{
497 				continue;  //retry
498 			}
499 		}
500 	}
501 	else
502 	{
503 		fdht_disconnect_server(pServer);
504 	}
505 
506 	break;
507 	}
508 
509 	return result;
510 }
511 
fdht_batch_set_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTObjectInfo * pObjectInfo,FDHTKeyValuePair * key_list,const int key_count,const time_t expires,int * success_count)512 int fdht_batch_set_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
513 		FDHTObjectInfo *pObjectInfo, FDHTKeyValuePair *key_list, \
514 		const int key_count, const time_t expires, int *success_count)
515 {
516 	int result;
517 	FDHTProtoHeader *pHeader;
518 	char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
519 	char buff[sizeof(FDHTProtoHeader) + FDHT_MAX_FULL_KEY_LEN + \
520 		(8 + FDHT_MAX_SUB_KEY_LEN) * FDHT_MAX_KEY_COUNT_PER_REQ + \
521 		32 * 1024];
522 	char *pBuff;
523 	int in_bytes;
524 	int total_key_len;
525 	int total_value_len;
526 	int pkg_total_len;
527 	int group_id;
528 	int hash_key_len;
529 	int key_hash_code;
530 	int i;
531 	ServerArray *pGroup;
532 	FDHTServerInfo *pServer;
533 	FDHTKeyValuePair *pKeyValuePair;
534 	FDHTKeyValuePair *pKeyValueEnd;
535 	char *p;
536 
537 	*success_count = 0;
538 	if (key_count <= 0 || key_count > FDHT_MAX_KEY_COUNT_PER_REQ)
539 	{
540 		logError("invalid key_count: %d", key_count);
541 		return EINVAL;
542 	}
543 
544 	CALC_OBJECT_HASH_CODE(pObjectInfo, hash_key, hash_key_len, key_hash_code)
545 	group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
546 	pGroup = pGroupArray->groups + group_id;
547 	for (i=0; i<=pGroup->count; i++)
548 	{
549 	pServer = get_writable_connection(pGroup, bKeepAlive, \
550 			key_hash_code, &result);
551 	if (pServer == NULL)
552 	{
553 		return result;
554 	}
555 
556 	total_key_len = 0;
557 	total_value_len = 0;
558 	pKeyValueEnd = key_list + key_count;
559 	for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; pKeyValuePair++)
560 	{
561 		total_key_len += pKeyValuePair->key_len;
562 		total_value_len += pKeyValuePair->value_len;
563 	}
564 	pkg_total_len = sizeof(FDHTProtoHeader) + 12 + pObjectInfo->namespace_len + \
565 			pObjectInfo->obj_id_len + 8 * key_count + \
566 			total_key_len + total_value_len;
567 
568 	if (pkg_total_len <= sizeof(buff))
569 	{
570 		pBuff = buff;
571 	}
572 	else
573 	{
574 		pBuff = (char *)malloc(pkg_total_len);
575 		if (pBuff == NULL)
576 		{
577 			result = errno != 0 ? errno : ENOMEM;
578 			logError("malloc %d bytes fail, " \
579 				"errno: %d, error info: %s", \
580 				pkg_total_len, result, STRERROR(result));
581 			return result;
582 		}
583 	}
584 
585 	memset(pBuff, 0, pkg_total_len);
586 	pHeader = (FDHTProtoHeader *)pBuff;
587 
588 	pHeader->cmd = FDHT_PROTO_CMD_BATCH_SET;
589 	pHeader->keep_alive = bKeepAlive;
590 	int2buff((int)time(NULL), pHeader->timestamp);
591 	int2buff((int)expires, pHeader->expires);
592 	int2buff(key_hash_code, pHeader->key_hash_code);
593 
594 	p = pBuff + sizeof(FDHTProtoHeader);
595 	PACK_BODY_OBJECT(pObjectInfo, p)
596 	int2buff(key_count, p);
597 	p += 4;
598 
599 	for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; pKeyValuePair++)
600 	{
601 		int2buff(pKeyValuePair->key_len, p);
602 		memcpy(p + 4, pKeyValuePair->szKey, pKeyValuePair->key_len);
603 		p += 4 + pKeyValuePair->key_len;
604 
605 		int2buff(pKeyValuePair->value_len, p);
606 		memcpy(p + 4, pKeyValuePair->pValue, pKeyValuePair->value_len);
607 		p += 4 + pKeyValuePair->value_len;
608 	}
609 
610 	do
611 	{
612 		int2buff(pkg_total_len - sizeof(FDHTProtoHeader), pHeader->pkg_len);
613 		if ((result=tcpsenddata_nb(pServer->sock, pBuff, pkg_total_len, \
614 			g_fdht_network_timeout)) != 0)
615 		{
616 			logError("send data to server %s:%d fail, " \
617 				"errno: %d, error info: %s", \
618 				pServer->ip_addr, pServer->port, \
619 				result, STRERROR(result));
620 			break;
621 		}
622 
623 		if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
624 		{
625 			break;
626 		}
627 
628 		if (in_bytes != 8 + 5 * key_count + total_key_len)
629 		{
630 			logError("server %s:%d reponse bytes: %d != %d", \
631 				pServer->ip_addr, pServer->port, in_bytes, \
632 				8 + 5 * key_count + total_key_len);
633 			result = EINVAL;
634 			break;
635 		}
636 
637 		if ((result=tcprecvdata_nb(pServer->sock, pBuff, \
638 			in_bytes, g_fdht_network_timeout)) != 0)
639 		{
640 			logError("file: "__FILE__", line: %d, " \
641 				"server: %s:%d, recv data fail, " \
642 				"errno: %d, error info: %s", \
643 				__LINE__, pServer->ip_addr, pServer->port, \
644 				result, STRERROR(result));
645 			break;
646 		}
647 
648 		if (buff2int(pBuff) != key_count)
649 		{
650 			result = EINVAL;
651 			logError("file: "__FILE__", line: %d, " \
652 				"server: %s:%d, invalid key_count: %d, " \
653 				"expect key count: %d", \
654 				__LINE__, pServer->ip_addr, pServer->port, \
655 				buff2int(pBuff), key_count);
656 			break;
657 		}
658 
659 		*success_count = buff2int(pBuff + 4);
660 		p = pBuff + 8;
661 		for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; \
662 			pKeyValuePair++)
663 		{
664 			pKeyValuePair->key_len = buff2int(p);
665 
666 			memcpy(pKeyValuePair->szKey, p + 4, \
667 				pKeyValuePair->key_len);
668 			p += 4 + pKeyValuePair->key_len;
669 			pKeyValuePair->status = *p++;
670 		}
671 	} while (0);
672 
673 	if (pBuff != buff)
674 	{
675 		free(pBuff);
676 	}
677 
678 	if (bKeepAlive)
679 	{
680 		if (result >= ENETDOWN) //network error
681 		{
682 			fdht_disconnect_server(pServer);
683 			if (result == ENOTCONN)
684 			{
685 				continue;  //retry
686 			}
687 		}
688 	}
689 	else
690 	{
691 		fdht_disconnect_server(pServer);
692 	}
693 	break;
694 	}
695 
696 	return result;
697 }
698 
fdht_batch_delete_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTObjectInfo * pObjectInfo,FDHTKeyValuePair * key_list,const int key_count,int * success_count)699 int fdht_batch_delete_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
700 		FDHTObjectInfo *pObjectInfo, FDHTKeyValuePair *key_list, \
701 		const int key_count, int *success_count)
702 {
703 	int result;
704 	FDHTProtoHeader *pHeader;
705 	char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
706 	char buff[sizeof(FDHTProtoHeader) + FDHT_MAX_FULL_KEY_LEN + 8 + \
707 		(5 + FDHT_MAX_SUB_KEY_LEN) * FDHT_MAX_KEY_COUNT_PER_REQ];
708 	int in_bytes;
709 	int total_key_len;
710 	int group_id;
711 	int hash_key_len;
712 	int key_hash_code;
713 	int i;
714 	ServerArray *pGroup;
715 	FDHTServerInfo *pServer;
716 	FDHTKeyValuePair *pKeyValuePair;
717 	FDHTKeyValuePair *pKeyValueEnd;
718 	char *p;
719 
720 	*success_count = 0;
721 	if (key_count <= 0 || key_count > FDHT_MAX_KEY_COUNT_PER_REQ)
722 	{
723 		logError("invalid key_count: %d", key_count);
724 		return EINVAL;
725 	}
726 
727 	CALC_OBJECT_HASH_CODE(pObjectInfo, hash_key, hash_key_len, key_hash_code)
728 	group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
729 	pGroup = pGroupArray->groups + group_id;
730 	for (i=0; i<=pGroup->count; i++)
731 	{
732 	pServer = get_readable_connection(pGroup, bKeepAlive, \
733 			key_hash_code, &result);
734 	if (pServer == NULL)
735 	{
736 		return result;
737 	}
738 
739 	memset(buff, 0, sizeof(buff));
740 	pHeader = (FDHTProtoHeader *)buff;
741 
742 	pHeader->cmd = FDHT_PROTO_CMD_BATCH_DEL;
743 	pHeader->keep_alive = bKeepAlive;
744 	int2buff((int)time(NULL), pHeader->timestamp);
745 	int2buff(key_hash_code, pHeader->key_hash_code);
746 
747 	p = buff + sizeof(FDHTProtoHeader);
748 	PACK_BODY_OBJECT(pObjectInfo, p)
749 	int2buff(key_count, p);
750 	p += 4;
751 
752 	total_key_len = 0;
753 	pKeyValueEnd = key_list + key_count;
754 	for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; pKeyValuePair++)
755 	{
756 		int2buff(pKeyValuePair->key_len, p);
757 		memcpy(p + 4, pKeyValuePair->szKey, pKeyValuePair->key_len);
758 		p += 4 + pKeyValuePair->key_len;
759 
760 		total_key_len += pKeyValuePair->key_len;
761 	}
762 
763 	do
764 	{
765 		int2buff((p - buff) - sizeof(FDHTProtoHeader), pHeader->pkg_len);
766 		if ((result=tcpsenddata_nb(pServer->sock, buff, p - buff, \
767 			g_fdht_network_timeout)) != 0)
768 		{
769 			logError("send data to server %s:%d fail, " \
770 				"errno: %d, error info: %s", \
771 				pServer->ip_addr, pServer->port, \
772 				result, STRERROR(result));
773 			break;
774 		}
775 
776 		if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
777 		{
778 			break;
779 		}
780 
781 		if (in_bytes != 8 + 5 * key_count + total_key_len)
782 		{
783 			logError("server %s:%d reponse bytes: %d != %d", \
784 				pServer->ip_addr, pServer->port, in_bytes, \
785 				8 + 5 * key_count + total_key_len);
786 			result = EINVAL;
787 			break;
788 		}
789 
790 		if ((result=tcprecvdata_nb(pServer->sock, buff, \
791 			in_bytes, g_fdht_network_timeout)) != 0)
792 		{
793 			logError("file: "__FILE__", line: %d, " \
794 				"server: %s:%d, recv data fail, " \
795 				"errno: %d, error info: %s", \
796 				__LINE__, pServer->ip_addr, pServer->port, \
797 				result, STRERROR(result));
798 			break;
799 		}
800 
801 		if (buff2int(buff) != key_count)
802 		{
803 			result = EINVAL;
804 			logError("file: "__FILE__", line: %d, " \
805 				"server: %s:%d, invalid key_count: %d, " \
806 				"expect key count: %d", \
807 				__LINE__, pServer->ip_addr, pServer->port, \
808 				buff2int(buff), key_count);
809 			break;
810 		}
811 
812 		*success_count = buff2int(buff + 4);
813 		p = buff + 8;
814 		for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; \
815 			pKeyValuePair++)
816 		{
817 			pKeyValuePair->key_len = buff2int(p);
818 
819 			memcpy(pKeyValuePair->szKey, p + 4, \
820 				pKeyValuePair->key_len);
821 			p += 4 + pKeyValuePair->key_len;
822 			pKeyValuePair->status = *p++;
823 		}
824 	} while (0);
825 
826 	if (bKeepAlive)
827 	{
828 		if (result >= ENETDOWN) //network error
829 		{
830 			fdht_disconnect_server(pServer);
831 			if (result == ENOTCONN)
832 			{
833 				continue;  //retry
834 			}
835 		}
836 	}
837 	else
838 	{
839 		fdht_disconnect_server(pServer);
840 	}
841 
842 	break;
843 	}
844 
845 	return result;
846 }
847 
fdht_batch_get_ex1(GroupArray * pGroupArray,const bool bKeepAlive,FDHTObjectInfo * pObjectInfo,FDHTKeyValuePair * key_list,const int key_count,const time_t expires,MallocFunc malloc_func,int * success_count)848 int fdht_batch_get_ex1(GroupArray *pGroupArray, const bool bKeepAlive, \
849 		FDHTObjectInfo *pObjectInfo, FDHTKeyValuePair *key_list, \
850 		const int key_count, const time_t expires, \
851 		MallocFunc malloc_func, int *success_count)
852 {
853 	int result;
854 	FDHTProtoHeader *pHeader;
855 	char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
856 	char buff[sizeof(FDHTProtoHeader) + FDHT_MAX_FULL_KEY_LEN + \
857 		(4 + FDHT_MAX_SUB_KEY_LEN) * FDHT_MAX_KEY_COUNT_PER_REQ + \
858 		32 * 1024];
859 	int in_bytes;
860 	int value_len;
861 	int group_id;
862 	int hash_key_len;
863 	int key_hash_code;
864 	char *pInBuff;
865 	int i;
866 	ServerArray *pGroup;
867 	FDHTServerInfo *pServer;
868 	FDHTKeyValuePair *pKeyValuePair;
869 	FDHTKeyValuePair *pKeyValueEnd;
870 	char *p;
871 
872 	*success_count = 0;
873 	if (key_count <= 0 || key_count > FDHT_MAX_KEY_COUNT_PER_REQ)
874 	{
875 		logError("invalid key_count: %d", key_count);
876 		return EINVAL;
877 	}
878 
879 	CALC_OBJECT_HASH_CODE(pObjectInfo, hash_key, hash_key_len, key_hash_code)
880 	group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
881 	pGroup = pGroupArray->groups + group_id;
882 	for (i=0; i<=pGroup->count; i++)
883 	{
884 	pServer = get_readable_connection(pGroup, bKeepAlive, \
885 			key_hash_code, &result);
886 	if (pServer == NULL)
887 	{
888 		return result;
889 	}
890 
891 	memset(buff, 0, sizeof(buff));
892 	pHeader = (FDHTProtoHeader *)buff;
893 
894 	pHeader->cmd = FDHT_PROTO_CMD_BATCH_GET;
895 	pHeader->keep_alive = bKeepAlive;
896 	int2buff((int)time(NULL), pHeader->timestamp);
897 	int2buff((int)expires, pHeader->expires);
898 	int2buff(key_hash_code, pHeader->key_hash_code);
899 
900 	p = buff + sizeof(FDHTProtoHeader);
901 	PACK_BODY_OBJECT(pObjectInfo, p)
902 	int2buff(key_count, p);
903 	p += 4;
904 
905 	pKeyValueEnd = key_list + key_count;
906 	for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; pKeyValuePair++)
907 	{
908 		int2buff(pKeyValuePair->key_len, p);
909 		memcpy(p + 4, pKeyValuePair->szKey, pKeyValuePair->key_len);
910 		p += 4 + pKeyValuePair->key_len;
911 	}
912 
913 	pInBuff = buff;
914 	do
915 	{
916 		int2buff((p - buff) - sizeof(FDHTProtoHeader), pHeader->pkg_len);
917 		if ((result=tcpsenddata_nb(pServer->sock, buff, p - buff, \
918 			g_fdht_network_timeout)) != 0)
919 		{
920 			logError("send data to server %s:%d fail, " \
921 				"errno: %d, error info: %s", \
922 				pServer->ip_addr, pServer->port, \
923 				result, STRERROR(result));
924 			break;
925 		}
926 
927 		if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
928 		{
929 			break;
930 		}
931 
932 		if (in_bytes < 17)
933 		{
934 			logError("server %s:%d reponse bytes: %d < 17", \
935 				pServer->ip_addr, pServer->port, in_bytes);
936 			result = EINVAL;
937 			break;
938 		}
939 
940 		if (in_bytes > sizeof(buff))
941 		{
942 			pInBuff = (char *)malloc(in_bytes);
943 			if (pInBuff == NULL)
944 			{
945 				result = errno != 0 ? errno : ENOMEM;
946 				logError("file: "__FILE__", line: %d, " \
947 					"malloc %d bytes fail, " \
948 					"errno: %d, error info: %s", \
949 					__LINE__, in_bytes, \
950 					result, STRERROR(result));
951 				break;
952 			}
953 		}
954 
955 		if ((result=tcprecvdata_nb(pServer->sock, pInBuff, \
956 			in_bytes, g_fdht_network_timeout)) != 0)
957 		{
958 			logError("file: "__FILE__", line: %d, " \
959 				"server: %s:%d, recv data fail, " \
960 				"errno: %d, error info: %s", \
961 				__LINE__, pServer->ip_addr, pServer->port, \
962 				result, STRERROR(result));
963 			break;
964 		}
965 
966 		if (buff2int(pInBuff) != key_count)
967 		{
968 			result = EINVAL;
969 			logError("file: "__FILE__", line: %d, " \
970 				"server: %s:%d, invalid key_count: %d, " \
971 				"expect key count: %d", \
972 				__LINE__, pServer->ip_addr, pServer->port, \
973 				buff2int(pInBuff), key_count);
974 			break;
975 		}
976 
977 		*success_count = buff2int(pInBuff + 4);
978 		p = pInBuff + 8;
979 		for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; \
980 			pKeyValuePair++)
981 		{
982 			pKeyValuePair->key_len = buff2int(p);
983 
984 			memcpy(pKeyValuePair->szKey, p + 4, \
985 				pKeyValuePair->key_len);
986 			p += 4 + pKeyValuePair->key_len;
987 			pKeyValuePair->status = *p++;
988 			if (pKeyValuePair->status != 0)
989 			{
990 				pKeyValuePair->value_len = 0;
991 				continue;
992 			}
993 
994 			value_len = buff2int(p);
995 			p += 4;
996 			if (pKeyValuePair->pValue != NULL)
997 			{
998 				if (value_len >= pKeyValuePair->value_len)
999 				{
1000 					*(pKeyValuePair->pValue) = '\0';
1001 					pKeyValuePair->value_len = 0;
1002 					pKeyValuePair->status = ENOSPC;
1003 				}
1004 				else
1005 				{
1006 					pKeyValuePair->value_len = value_len;
1007 					memcpy(pKeyValuePair->pValue, p, \
1008 						value_len);
1009 					*(pKeyValuePair->pValue+value_len)='\0';
1010 				}
1011 			}
1012 			else
1013 			{
1014 				pKeyValuePair->pValue = (char *)malloc_func( \
1015 						value_len + 1);
1016 				if (pKeyValuePair->pValue == NULL)
1017 				{
1018 					pKeyValuePair->value_len = 0;
1019 					pKeyValuePair->status = errno != 0 ? \
1020 								errno : ENOMEM;
1021 					logError("malloc %d bytes fail, " \
1022 						"errno: %d, error info: %s", \
1023 						value_len+1, errno, \
1024 						STRERROR(errno));
1025 				}
1026 				else
1027 				{
1028 					pKeyValuePair->value_len = value_len;
1029 					memcpy(pKeyValuePair->pValue, p, \
1030 						value_len);
1031 					*(pKeyValuePair->pValue+value_len)='\0';
1032 				}
1033 			}
1034 
1035 			p += value_len;
1036 		}
1037 
1038 		if (in_bytes != p - pInBuff)
1039 		{
1040 			*success_count = 0;
1041 			logError("server %s:%d reponse bytes: %d != %d", \
1042 				pServer->ip_addr, pServer->port, \
1043 				in_bytes, (int)(p - pInBuff));
1044 			result = EINVAL;
1045 			break;
1046 		}
1047 	} while (0);
1048 
1049 	if (pInBuff != buff)
1050 	{
1051 		free(pInBuff);
1052 	}
1053 
1054 	if (bKeepAlive)
1055 	{
1056 		if (result >= ENETDOWN) //network error
1057 		{
1058 			fdht_disconnect_server(pServer);
1059 			if (result == ENOTCONN)
1060 			{
1061 				continue;  //retry
1062 			}
1063 		}
1064 	}
1065 	else
1066 	{
1067 		fdht_disconnect_server(pServer);
1068 	}
1069 
1070 	break;
1071 	}
1072 
1073 	return result;
1074 }
1075 
fdht_set_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTKeyInfo * pKeyInfo,const time_t expires,const char * pValue,const int value_len)1076 int fdht_set_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
1077 		FDHTKeyInfo *pKeyInfo, const time_t expires, \
1078 		const char *pValue, const int value_len)
1079 {
1080 	int result;
1081 	char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
1082 	int group_id;
1083 	int hash_key_len;
1084 	int key_hash_code;
1085 	int i;
1086 	ServerArray *pGroup;
1087 	FDHTServerInfo *pServer;
1088 
1089 	CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code)
1090 	group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
1091 
1092 	pGroup = pGroupArray->groups + group_id;
1093 	for (i=0; i<=pGroup->count; i++)
1094 	{
1095 	pServer = get_writable_connection(pGroup, bKeepAlive, \
1096 			key_hash_code, &result);
1097 	if (pServer == NULL)
1098 	{
1099 		return result;
1100 	}
1101 
1102 	//printf("key_hash_code=%d, group_id=%d\n", key_hash_code, group_id);
1103 
1104 	//printf("set group_id=%d\n", group_id);
1105 	result = fdht_client_set(pServer, bKeepAlive, time(NULL), expires, \
1106 			FDHT_PROTO_CMD_SET, key_hash_code, \
1107 			pKeyInfo, pValue, value_len);
1108 
1109 	if (bKeepAlive)
1110 	{
1111 		if (result >= ENETDOWN) //network error
1112 		{
1113 			fdht_disconnect_server(pServer);
1114 			if (result == ENOTCONN)
1115 			{
1116 				continue;  //retry
1117 			}
1118 		}
1119 	}
1120 	else
1121 	{
1122 		fdht_disconnect_server(pServer);
1123 	}
1124 
1125 	break;
1126 	}
1127 
1128 	return result;
1129 }
1130 
1131 /**
1132 * request body format:
1133 *       namespace_len:  4 bytes big endian integer
1134 *       namespace: can be emtpy
1135 *       obj_id_len:  4 bytes big endian integer
1136 *       object_id: the object id (can be empty)
1137 *       key_len:  4 bytes big endian integer
1138 *       key:      key name
1139 *       incr      4 bytes big endian integer
1140 * response body format:
1141 *      value_len: 4 bytes big endian integer
1142 *      value :  value_len bytes
1143 */
fdht_inc_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTKeyInfo * pKeyInfo,const time_t expires,const int increase,char * pValue,int * value_len)1144 int fdht_inc_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
1145 		FDHTKeyInfo *pKeyInfo, const time_t expires, \
1146 		const int increase, char *pValue, int *value_len)
1147 {
1148 	int result;
1149 	FDHTProtoHeader *pHeader;
1150 	char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
1151 	char buff[FDHT_MAX_FULL_KEY_LEN + 32];
1152 	char *in_buff;
1153 	int in_bytes;
1154 	int group_id;
1155 	int hash_key_len;
1156 	int key_hash_code;
1157 	int i;
1158 	ServerArray *pGroup;
1159 	FDHTServerInfo *pServer;
1160 	char *p;
1161 
1162 	CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code)
1163 	group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
1164 	pGroup = pGroupArray->groups + group_id;
1165 	for (i=0; i<=pGroup->count; i++)
1166 	{
1167 	pServer = get_writable_connection(pGroup, bKeepAlive, \
1168 			key_hash_code, &result);
1169 	if (pServer == NULL)
1170 	{
1171 		return result;
1172 	}
1173 
1174 	//printf("inc group_id=%d\n", group_id);
1175 
1176 	memset(buff, 0, sizeof(buff));
1177 	pHeader = (FDHTProtoHeader *)buff;
1178 
1179 	pHeader->cmd = FDHT_PROTO_CMD_INC;
1180 	pHeader->keep_alive = bKeepAlive;
1181 	int2buff((int)time(NULL), pHeader->timestamp);
1182 	int2buff((int)expires, pHeader->expires);
1183 	int2buff(key_hash_code, pHeader->key_hash_code);
1184 	int2buff(16 + pKeyInfo->namespace_len + pKeyInfo->obj_id_len + \
1185 		pKeyInfo->key_len, pHeader->pkg_len);
1186 
1187 	while (1)
1188 	{
1189 		p = buff + sizeof(FDHTProtoHeader);
1190 		PACK_BODY_UNTIL_KEY(pKeyInfo, p)
1191 		int2buff(increase, p);
1192 		p += 4;
1193 		if ((result=tcpsenddata_nb(pServer->sock, buff, p - buff, \
1194 			g_fdht_network_timeout)) != 0)
1195 		{
1196 			logError("send data to server %s:%d fail, " \
1197 				"errno: %d, error info: %s", \
1198 				pServer->ip_addr, pServer->port, \
1199 				result, STRERROR(result));
1200 			break;
1201 		}
1202 
1203 		in_buff = buff;
1204 		if ((result=fdht_recv_response(pServer, &in_buff, \
1205 			sizeof(buff), &in_bytes)) != 0)
1206 		{
1207 			logError("recv data from server %s:%d fail, " \
1208 				"errno: %d, error info: %s", \
1209 				pServer->ip_addr, pServer->port, \
1210 				result, STRERROR(result));
1211 			break;
1212 		}
1213 
1214 		if (in_bytes < 4)
1215 		{
1216 			logError("server %s:%d reponse bytes: %d < 4!", \
1217 				pServer->ip_addr, pServer->port, in_bytes);
1218 			result = EINVAL;
1219 			break;
1220 		}
1221 
1222 		if (in_bytes - 4 >= *value_len)
1223 		{
1224 			*value_len = 0;
1225 			result = ENOSPC;
1226 			break;
1227 		}
1228 
1229 		*value_len = in_bytes - 4;
1230 		memcpy(pValue, in_buff + 4, *value_len);
1231 		*(pValue + (*value_len)) = '\0';
1232 		break;
1233 	}
1234 
1235 	if (bKeepAlive)
1236 	{
1237 		if (result >= ENETDOWN) //network error
1238 		{
1239 			fdht_disconnect_server(pServer);
1240 			if (result == ENOTCONN)
1241 			{
1242 				continue;  //retry
1243 			}
1244 		}
1245 	}
1246 	else
1247 	{
1248 		fdht_disconnect_server(pServer);
1249 	}
1250 
1251 	break;
1252 	}
1253 
1254 	return result;
1255 }
1256 
fdht_delete_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTKeyInfo * pKeyInfo)1257 int fdht_delete_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
1258 		FDHTKeyInfo *pKeyInfo)
1259 {
1260 	int result;
1261 	char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
1262 	int group_id;
1263 	int hash_key_len;
1264 	int key_hash_code;
1265 	int i;
1266 	ServerArray *pGroup;
1267 	FDHTServerInfo *pServer;
1268 
1269 	CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code)
1270 	group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
1271 	pGroup = pGroupArray->groups + group_id;
1272 	for (i=0; i<=pGroup->count; i++)
1273 	{
1274 	pServer = get_writable_connection(pGroup, bKeepAlive, \
1275 			key_hash_code , &result);
1276 	if (pServer == NULL)
1277 	{
1278 		return result;
1279 	}
1280 
1281 	//printf("del group_id=%d\n", group_id);
1282 	result = fdht_client_delete(pServer, bKeepAlive, time(NULL), \
1283 			FDHT_PROTO_CMD_DEL, key_hash_code, pKeyInfo);
1284 
1285 	if (bKeepAlive)
1286 	{
1287 		if (result >= ENETDOWN) //network error
1288 		{
1289 			fdht_disconnect_server(pServer);
1290 			if (result == ENOTCONN)
1291 			{
1292 				continue;  //retry
1293 			}
1294 		}
1295 	}
1296 	else
1297 	{
1298 		fdht_disconnect_server(pServer);
1299 	}
1300 
1301 	break;
1302 	}
1303 
1304 	return result;
1305 }
1306 
fdht_connect_all_servers(GroupArray * pGroupArray,const bool bKeepAlive,int * success_count,int * fail_count)1307 int fdht_connect_all_servers(GroupArray *pGroupArray, const bool bKeepAlive, \
1308 			int *success_count, int *fail_count)
1309 {
1310 	FDHTServerInfo *pServerInfo;
1311 	FDHTServerInfo *pServerEnd;
1312 	int conn_result;
1313 	int result;
1314 
1315 	*success_count = 0;
1316 	*fail_count = 0;
1317 	if (pGroupArray->servers == NULL)
1318 	{
1319 		return ENOENT;
1320 	}
1321 
1322 	result = 0;
1323 
1324 	pServerEnd = pGroupArray->servers + pGroupArray->server_count;
1325 	for (pServerInfo=pGroupArray->servers; \
1326 			pServerInfo<pServerEnd; pServerInfo++)
1327 	{
1328 		if ((conn_result=fdht_connect_server_nb(pServerInfo, \
1329 				g_fdht_connect_timeout)) != 0)
1330 		{
1331 			result = conn_result;
1332 			(*fail_count)++;
1333 		}
1334 		else //connect success
1335 		{
1336 			(*success_count)++;
1337 			if (bKeepAlive || pGroupArray->use_proxy)
1338 			{
1339 				tcpsetnodelay(pServerInfo->sock, 3600);
1340 			}
1341 		}
1342 	}
1343 
1344 	if (result != 0)
1345 	{
1346 		return result;
1347 	}
1348 	else
1349 	{
1350 		return  *success_count > 0 ? 0: ENOENT;
1351 	}
1352 }
1353 
fdht_disconnect_all_servers(GroupArray * pGroupArray)1354 void fdht_disconnect_all_servers(GroupArray *pGroupArray)
1355 {
1356 	FDHTServerInfo *pServerInfo;
1357 	FDHTServerInfo *pServerEnd;
1358 
1359 	if (pGroupArray->servers != NULL)
1360 	{
1361 		pServerEnd = pGroupArray->servers + pGroupArray->server_count;
1362 		for (pServerInfo=pGroupArray->servers; \
1363 				pServerInfo<pServerEnd; pServerInfo++)
1364 		{
1365 			if (pServerInfo->sock >= 0)
1366 			{
1367 				if (!pGroupArray->use_proxy)
1368 				{
1369 					fdht_quit(pServerInfo);
1370 				}
1371 				close(pServerInfo->sock);
1372 				pServerInfo->sock = -1;
1373 			}
1374 		}
1375 	}
1376 }
1377 
fdht_stat_ex(GroupArray * pGroupArray,const bool bKeepAlive,const int server_index,char * buff,const int size)1378 int fdht_stat_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
1379 		const int server_index, char *buff, const int size)
1380 {
1381 	int result;
1382 	int in_bytes;
1383 	int i;
1384 	FDHTProtoHeader header;
1385 	FDHTServerInfo *pServer;
1386 
1387 	memset(buff, 0, size);
1388 	if (server_index < 0 || server_index > pGroupArray->server_count)
1389 	{
1390 		logError("invalid servier_index: %d", server_index);
1391 		return EINVAL;
1392 	}
1393 
1394 	pServer = pGroupArray->servers + server_index;
1395 	for (i=0; i<2; i++)
1396 	{
1397 	if ((result=fdht_connect_server_nb(pServer, \
1398 			g_fdht_connect_timeout)) != 0)
1399 	{
1400 		return result;
1401 	}
1402 
1403 	if (bKeepAlive)
1404 	{
1405 		tcpsetnodelay(pServer->sock, 3600);
1406 	}
1407 
1408 	memset(&header, 0, sizeof(header));
1409 	header.cmd = FDHT_PROTO_CMD_STAT;
1410 	header.keep_alive = bKeepAlive;
1411 	int2buff((int)time(NULL), header.timestamp);
1412 
1413 	do
1414 	{
1415 		if ((result=tcpsenddata_nb(pServer->sock, &header, \
1416 			sizeof(header), g_fdht_network_timeout)) != 0)
1417 		{
1418 			logError("send data to server %s:%d fail, " \
1419 				"errno: %d, error info: %s", \
1420 				pServer->ip_addr, pServer->port, \
1421 				result, STRERROR(result));
1422 			break;
1423 		}
1424 
1425 		if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
1426 		{
1427 			break;
1428 		}
1429 
1430 		if (in_bytes >= size)
1431 		{
1432 			logError("server %s:%d reponse bytes: %d >= " \
1433 				"buff size: %d", pServer->ip_addr, \
1434 				pServer->port, in_bytes, size);
1435 			result = ENOSPC;
1436 			break;
1437 		}
1438 
1439 		if ((result=tcprecvdata_nb(pServer->sock, buff, \
1440 			in_bytes, g_fdht_network_timeout)) != 0)
1441 		{
1442 			logError("file: "__FILE__", line: %d, " \
1443 				"server: %s:%d, recv data fail, " \
1444 				"errno: %d, error info: %s", \
1445 				__LINE__, pServer->ip_addr, \
1446 				pServer->port, \
1447 				result, STRERROR(result));
1448 			break;
1449 		}
1450 	} while (0);
1451 
1452 	if (bKeepAlive)
1453 	{
1454 		if (result >= ENETDOWN) //network error
1455 		{
1456 			fdht_disconnect_server(pServer);
1457 			if (result == ENOTCONN)
1458 			{
1459 				continue;  //retry
1460 			}
1461 		}
1462 	}
1463 	else
1464 	{
1465 		fdht_disconnect_server(pServer);
1466 	}
1467 
1468 	break;
1469 	}
1470 
1471 	return result;
1472 }
1473 
1474