1 /**
2 * Copyright (C) 2008 Happy Fish / YuQing
3 *
4 * FastDFS may be copied only under the terms of the GNU General
5 * Public License V3, which may be found in the FastDFS source kit.
6 * Please visit the FastDFS Home Page http://www.fastken.com/ for more detail.
7 **/
8
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <unistd.h>
13 #include <string.h>
14 #include <errno.h>
15 #include <sys/types.h>
16 #include "fastcommon/sockopt.h"
17 #include "fastcommon/logger.h"
18 #include "fastcommon/hash.h"
19 #include "fastcommon/shared_func.h"
20 #include "fastcommon/ini_file_reader.h"
21 #include "fdht_types.h"
22 #include "fdht_proto.h"
23 #include "fdht_global.h"
24 #include "fdht_client.h"
25
26 GroupArray g_group_array = {NULL, 0};
27 bool g_keep_alive = false;
28
fdht_proxy_extra_deal(GroupArray * pGroupArray,bool * bKeepAlive)29 static void fdht_proxy_extra_deal(GroupArray *pGroupArray, bool *bKeepAlive)
30 {
31 int group_id;
32 ServerArray *pServerArray;
33 FDHTServerInfo **ppServer;
34 FDHTServerInfo **ppServerEnd;
35
36 if (!pGroupArray->use_proxy)
37 {
38 return;
39 }
40
41 *bKeepAlive = true;
42 pGroupArray->server_count = 1;
43 memcpy(pGroupArray->servers, &pGroupArray->proxy_server, \
44 sizeof(FDHTServerInfo));
45
46 pServerArray = pGroupArray->groups;
47 for (group_id=0; group_id<pGroupArray->group_count; group_id++)
48 {
49 ppServerEnd = pServerArray->servers + pServerArray->count;
50 for (ppServer=pServerArray->servers; \
51 ppServer<ppServerEnd; ppServer++)
52 {
53 *ppServer = pGroupArray->servers;
54 }
55
56 pServerArray++;
57 }
58 }
59
fdht_client_init(const char * filename)60 int fdht_client_init(const char *filename)
61 {
62 char *pBasePath;
63 IniContext iniContext;
64 char szProxyPrompt[64];
65 int result;
66
67 memset(&iniContext, 0, sizeof(IniContext));
68 if ((result=iniLoadFromFile(filename, &iniContext)) != 0)
69 {
70 logError("load conf file \"%s\" fail, ret code: %d", \
71 filename, result);
72 return result;
73 }
74
75 //iniPrintItems(&iniContext);
76
77 while (1)
78 {
79 pBasePath = iniGetStrValue(NULL, "base_path", &iniContext);
80 if (pBasePath == NULL)
81 {
82 logError("conf file \"%s\" must have item " \
83 "\"base_path\"!", filename);
84 result = ENOENT;
85 break;
86 }
87
88 snprintf(g_fdht_base_path, sizeof(g_fdht_base_path), "%s", pBasePath);
89 chopPath(g_fdht_base_path);
90 if (!fileExists(g_fdht_base_path))
91 {
92 logError("\"%s\" can't be accessed, error info: %s", \
93 g_fdht_base_path, STRERROR(errno));
94 result = errno != 0 ? errno : ENOENT;
95 break;
96 }
97 if (!isDir(g_fdht_base_path))
98 {
99 logError("\"%s\" is not a directory!", g_fdht_base_path);
100 result = ENOTDIR;
101 break;
102 }
103
104 g_fdht_connect_timeout = iniGetIntValue(NULL, "connect_timeout", \
105 &iniContext, DEFAULT_CONNECT_TIMEOUT);
106 if (g_fdht_connect_timeout <= 0)
107 {
108 g_fdht_connect_timeout = DEFAULT_CONNECT_TIMEOUT;
109 }
110
111 g_fdht_network_timeout = iniGetIntValue(NULL, "network_timeout", \
112 &iniContext, DEFAULT_NETWORK_TIMEOUT);
113 if (g_fdht_network_timeout <= 0)
114 {
115 g_fdht_network_timeout = DEFAULT_NETWORK_TIMEOUT;
116 }
117
118 g_keep_alive = iniGetBoolValue(NULL, "keep_alive", \
119 &iniContext, false);
120
121 if ((result=fdht_load_groups(&iniContext, \
122 &g_group_array)) != 0)
123 {
124 break;
125 }
126
127 if (g_group_array.use_proxy)
128 {
129 sprintf(szProxyPrompt, "proxy_addr=%s, proxy_port=%d, ",
130 g_group_array.proxy_server.ip_addr,
131 g_group_array.proxy_server.port);
132 }
133 else
134 {
135 *szProxyPrompt = '\0';
136 }
137
138 load_log_level(&iniContext);
139
140 logInfo("file: "__FILE__", line: %d, " \
141 "base_path=%s, " \
142 "connect_timeout=%ds, network_timeout=%ds, " \
143 "keep_alive=%d, use_proxy=%d, %s"\
144 "group_count=%d, server_count=%d", __LINE__, \
145 g_fdht_base_path, g_fdht_connect_timeout, \
146 g_fdht_network_timeout, g_keep_alive, \
147 g_group_array.use_proxy, szProxyPrompt, \
148 g_group_array.group_count, g_group_array.server_count);
149
150 fdht_proxy_extra_deal(&g_group_array, &g_keep_alive);
151
152 break;
153 }
154
155 iniFreeContext(&iniContext);
156
157 return result;
158 }
159
fdht_load_conf(const char * filename,GroupArray * pGroupArray,bool * bKeepAlive)160 int fdht_load_conf(const char *filename, GroupArray *pGroupArray, \
161 bool *bKeepAlive)
162 {
163 IniContext iniContext;
164 int result;
165
166 if ((result=iniLoadFromFile(filename, &iniContext)) != 0)
167 {
168 logError("file: "__FILE__", line: %d, " \
169 "load conf file \"%s\" fail, " \
170 "ret code: %d", __LINE__, \
171 filename, result);
172 return result;
173 }
174
175 *bKeepAlive = iniGetBoolValue(NULL, "keep_alive", &iniContext, false);
176 if ((result=fdht_load_groups(&iniContext, pGroupArray)) != 0)
177 {
178 iniFreeContext(&iniContext);
179 return result;
180 }
181
182 fdht_proxy_extra_deal(pGroupArray, bKeepAlive);
183
184 iniFreeContext(&iniContext);
185 return 0;
186 }
187
fdht_client_destroy()188 void fdht_client_destroy()
189 {
190 fdht_free_group_array(&g_group_array);
191 }
192
193 #define get_readable_connection(pServerArray, bKeepAlive, hash_code, err_no) \
194 get_connection(pServerArray, bKeepAlive, hash_code, err_no)
195
196 #define get_writable_connection(pServerArray, bKeepAlive, hash_code, err_no) \
197 get_connection(pServerArray, bKeepAlive, hash_code, err_no)
198
get_connection(ServerArray * pServerArray,const bool bKeepAlive,const int hash_code,int * err_no)199 static FDHTServerInfo *get_connection(ServerArray *pServerArray, \
200 const bool bKeepAlive, const int hash_code, int *err_no)
201 {
202 FDHTServerInfo **ppServer;
203 FDHTServerInfo **ppEnd;
204 int server_index;
205 int new_hash_code;
206
207 new_hash_code = (hash_code << 16) | (hash_code >> 16);
208 if (new_hash_code < 0)
209 {
210 new_hash_code &= 0x7FFFFFFF;
211 }
212 server_index = new_hash_code % pServerArray->count;
213 ppEnd = pServerArray->servers + pServerArray->count;
214 for (ppServer = pServerArray->servers + server_index; \
215 ppServer<ppEnd; ppServer++)
216 {
217 if ((*ppServer)->sock > 0) //already connected
218 {
219 return *ppServer;
220 }
221
222 if (fdht_connect_server_nb(*ppServer, \
223 g_fdht_connect_timeout) == 0)
224 {
225 if (bKeepAlive)
226 {
227 tcpsetnodelay((*ppServer)->sock, 3600);
228 }
229 return *ppServer;
230 }
231 }
232
233 ppEnd = pServerArray->servers + server_index;
234 for (ppServer = pServerArray->servers; ppServer<ppEnd; ppServer++)
235 {
236 if ((*ppServer)->sock > 0) //already connected
237 {
238 return *ppServer;
239 }
240
241 if (fdht_connect_server_nb(*ppServer, \
242 g_fdht_connect_timeout) == 0)
243 {
244 if (bKeepAlive)
245 {
246 tcpsetnodelay((*ppServer)->sock, 3600);
247 }
248 return *ppServer;
249 }
250 }
251
252 *err_no = ENOENT;
253 return NULL;
254 }
255
256 #define CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code) \
257 if (pKeyInfo->namespace_len > FDHT_MAX_NAMESPACE_LEN) \
258 { \
259 fprintf(stderr, "namespace length: %d exceeds, " \
260 "max length: %d\n", \
261 pKeyInfo->namespace_len, FDHT_MAX_NAMESPACE_LEN); \
262 return EINVAL; \
263 } \
264 \
265 if (pKeyInfo->obj_id_len > FDHT_MAX_OBJECT_ID_LEN) \
266 { \
267 fprintf(stderr, "object ID length: %d exceeds, " \
268 "max length: %d\n", \
269 pKeyInfo->obj_id_len, FDHT_MAX_OBJECT_ID_LEN); \
270 return EINVAL; \
271 } \
272 \
273 if (pKeyInfo->key_len > FDHT_MAX_SUB_KEY_LEN) \
274 { \
275 fprintf(stderr, "key length: %d exceeds, max length: %d\n", \
276 pKeyInfo->key_len, FDHT_MAX_SUB_KEY_LEN); \
277 return EINVAL; \
278 } \
279 \
280 if (pKeyInfo->namespace_len == 0 && pKeyInfo->obj_id_len == 0) \
281 { \
282 hash_key_len = pKeyInfo->key_len; \
283 memcpy(hash_key, pKeyInfo->szKey, pKeyInfo->key_len); \
284 } \
285 else if (pKeyInfo->namespace_len > 0 && pKeyInfo->obj_id_len > 0) \
286 { \
287 hash_key_len = pKeyInfo->namespace_len+1+pKeyInfo->obj_id_len; \
288 memcpy(hash_key,pKeyInfo->szNameSpace,pKeyInfo->namespace_len);\
289 *(hash_key + pKeyInfo->namespace_len)=FDHT_FULL_KEY_SEPERATOR; \
290 memcpy(hash_key + pKeyInfo->namespace_len + 1, \
291 pKeyInfo->szObjectId, pKeyInfo->obj_id_len); \
292 } \
293 else \
294 { \
295 fprintf(stderr, "invalid namespace length: %d and " \
296 "object ID length: %d\n", \
297 pKeyInfo->namespace_len, pKeyInfo->obj_id_len); \
298 return EINVAL; \
299 } \
300 \
301 key_hash_code = PJWHash(hash_key, hash_key_len); \
302 if (key_hash_code < 0) \
303 { \
304 key_hash_code &= 0x7FFFFFFF; \
305 } \
306
307
308 #define CALC_OBJECT_HASH_CODE(pObjectInfo, hash_key, hash_key_len, key_hash_code) \
309 if (pObjectInfo->namespace_len <= 0 || pObjectInfo->obj_id_len <= 0) \
310 { \
311 fprintf(stderr, "invalid namespace length: %d and " \
312 "object ID length: %d\n", \
313 pObjectInfo->namespace_len, pObjectInfo->obj_id_len); \
314 return EINVAL; \
315 } \
316 \
317 if (pObjectInfo->namespace_len > FDHT_MAX_NAMESPACE_LEN) \
318 { \
319 fprintf(stderr, "namespace length: %d exceeds, " \
320 "max length: %d\n", \
321 pObjectInfo->namespace_len, FDHT_MAX_NAMESPACE_LEN); \
322 return EINVAL; \
323 } \
324 \
325 if (pObjectInfo->obj_id_len > FDHT_MAX_OBJECT_ID_LEN) \
326 { \
327 fprintf(stderr, "object ID length: %d exceeds, " \
328 "max length: %d\n", \
329 pObjectInfo->obj_id_len, FDHT_MAX_OBJECT_ID_LEN); \
330 return EINVAL; \
331 } \
332 hash_key_len = pObjectInfo->namespace_len+1+pObjectInfo->obj_id_len; \
333 memcpy(hash_key, pObjectInfo->szNameSpace, pObjectInfo->namespace_len);\
334 *(hash_key + pObjectInfo->namespace_len) = FDHT_FULL_KEY_SEPERATOR; \
335 memcpy(hash_key + pObjectInfo->namespace_len + 1, \
336 pObjectInfo->szObjectId, pObjectInfo->obj_id_len); \
337 \
338 key_hash_code = PJWHash(hash_key, hash_key_len); \
339 if (key_hash_code < 0) \
340 { \
341 key_hash_code &= 0x7FFFFFFF; \
342 } \
343
344
345 /**
346 * request body format:
347 * namespace_len: 4 bytes big endian integer
348 * namespace: can be emtpy
349 * obj_id_len: 4 bytes big endian integer
350 * object_id: the object id (can be empty)
351 * key_len: 4 bytes big endian integer
352 * key: key name
353 * response body format:
354 * value_len: 4 bytes big endian integer
355 * value: value buff
356 */
fdht_get_ex1(GroupArray * pGroupArray,const bool bKeepAlive,FDHTKeyInfo * pKeyInfo,const time_t expires,char ** ppValue,int * value_len,MallocFunc malloc_func)357 int fdht_get_ex1(GroupArray *pGroupArray, const bool bKeepAlive, \
358 FDHTKeyInfo *pKeyInfo, const time_t expires, \
359 char **ppValue, int *value_len, MallocFunc malloc_func)
360 {
361 int result;
362 FDHTProtoHeader *pHeader;
363 char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
364 char buff[sizeof(FDHTProtoHeader) + FDHT_MAX_FULL_KEY_LEN + 16];
365 int in_bytes;
366 int vlen;
367 int group_id;
368 int hash_key_len;
369 int key_hash_code;
370 int i;
371 ServerArray *pGroup;
372 FDHTServerInfo *pServer;
373 char *p;
374
375 CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code)
376 group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
377 //printf("get group_id=%d\n", group_id);
378
379 pGroup = pGroupArray->groups + group_id;
380 for (i=0; i<=pGroup->count; i++)
381 {
382 pServer = get_readable_connection(pGroup, bKeepAlive, \
383 key_hash_code, &result);
384 if (pServer == NULL)
385 {
386 return result;
387 }
388
389 memset(buff, 0, sizeof(buff));
390 pHeader = (FDHTProtoHeader *)buff;
391
392 pHeader->cmd = FDHT_PROTO_CMD_GET;
393 pHeader->keep_alive = bKeepAlive;
394 int2buff((int)time(NULL), pHeader->timestamp);
395 int2buff((int)expires, pHeader->expires);
396 int2buff(key_hash_code, pHeader->key_hash_code);
397 int2buff(12 + pKeyInfo->namespace_len + pKeyInfo->obj_id_len + \
398 pKeyInfo->key_len, pHeader->pkg_len);
399
400 do
401 {
402 p = buff + sizeof(FDHTProtoHeader);
403 PACK_BODY_UNTIL_KEY(pKeyInfo, p)
404 if ((result=tcpsenddata_nb(pServer->sock, buff, p - buff, \
405 g_fdht_network_timeout)) != 0)
406 {
407 logError("send data to server %s:%d fail, " \
408 "errno: %d, error info: %s", \
409 pServer->ip_addr, pServer->port, \
410 result, STRERROR(result));
411 break;
412 }
413
414 if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
415 {
416 break;
417 }
418
419 if (in_bytes < 4)
420 {
421 logError("server %s:%d reponse bytes: %d < 4", \
422 pServer->ip_addr, pServer->port, in_bytes);
423 result = EINVAL;
424 break;
425 }
426
427 if ((result=tcprecvdata_nb(pServer->sock, buff, \
428 4, g_fdht_network_timeout)) != 0)
429 {
430 logError("file: "__FILE__", line: %d, " \
431 "server: %s:%d, recv data fail, " \
432 "errno: %d, error info: %s", \
433 __LINE__, pServer->ip_addr, \
434 pServer->port, \
435 result, STRERROR(result));
436 break;
437 }
438
439 vlen = buff2int(buff);
440 if (vlen != in_bytes - 4)
441 {
442 logError("server %s:%d reponse bytes: %d " \
443 "is not correct, %d != %d", pServer->ip_addr, \
444 pServer->port, in_bytes, vlen, in_bytes - 4);
445 result = EINVAL;
446 break;
447 }
448
449 if (*ppValue != NULL)
450 {
451 if (vlen >= *value_len)
452 {
453 *value_len = 0;
454 result = ENOSPC;
455 break;
456 }
457
458 *value_len = vlen;
459 }
460 else
461 {
462 *value_len = vlen;
463 *ppValue = (char *)malloc_func((*value_len + 1));
464 if (*ppValue == NULL)
465 {
466 *value_len = 0;
467 logError("malloc %d bytes fail, " \
468 "errno: %d, error info: %s", \
469 *value_len + 1, errno, STRERROR(errno));
470 result = errno != 0 ? errno : ENOMEM;
471 break;
472 }
473 }
474
475 if ((result=tcprecvdata_nb(pServer->sock, *ppValue, \
476 *value_len, g_fdht_network_timeout)) != 0)
477 {
478 logError("file: "__FILE__", line: %d, " \
479 "server: %s:%d, recv data fail, " \
480 "errno: %d, error info: %s", \
481 __LINE__, pServer->ip_addr, \
482 pServer->port, \
483 result, STRERROR(result));
484 break;
485 }
486
487 *(*ppValue + *value_len) = '\0';
488 } while(0);
489
490 if (bKeepAlive)
491 {
492 if (result >= ENETDOWN) //network error
493 {
494 fdht_disconnect_server(pServer);
495 if (result == ENOTCONN)
496 {
497 continue; //retry
498 }
499 }
500 }
501 else
502 {
503 fdht_disconnect_server(pServer);
504 }
505
506 break;
507 }
508
509 return result;
510 }
511
fdht_batch_set_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTObjectInfo * pObjectInfo,FDHTKeyValuePair * key_list,const int key_count,const time_t expires,int * success_count)512 int fdht_batch_set_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
513 FDHTObjectInfo *pObjectInfo, FDHTKeyValuePair *key_list, \
514 const int key_count, const time_t expires, int *success_count)
515 {
516 int result;
517 FDHTProtoHeader *pHeader;
518 char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
519 char buff[sizeof(FDHTProtoHeader) + FDHT_MAX_FULL_KEY_LEN + \
520 (8 + FDHT_MAX_SUB_KEY_LEN) * FDHT_MAX_KEY_COUNT_PER_REQ + \
521 32 * 1024];
522 char *pBuff;
523 int in_bytes;
524 int total_key_len;
525 int total_value_len;
526 int pkg_total_len;
527 int group_id;
528 int hash_key_len;
529 int key_hash_code;
530 int i;
531 ServerArray *pGroup;
532 FDHTServerInfo *pServer;
533 FDHTKeyValuePair *pKeyValuePair;
534 FDHTKeyValuePair *pKeyValueEnd;
535 char *p;
536
537 *success_count = 0;
538 if (key_count <= 0 || key_count > FDHT_MAX_KEY_COUNT_PER_REQ)
539 {
540 logError("invalid key_count: %d", key_count);
541 return EINVAL;
542 }
543
544 CALC_OBJECT_HASH_CODE(pObjectInfo, hash_key, hash_key_len, key_hash_code)
545 group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
546 pGroup = pGroupArray->groups + group_id;
547 for (i=0; i<=pGroup->count; i++)
548 {
549 pServer = get_writable_connection(pGroup, bKeepAlive, \
550 key_hash_code, &result);
551 if (pServer == NULL)
552 {
553 return result;
554 }
555
556 total_key_len = 0;
557 total_value_len = 0;
558 pKeyValueEnd = key_list + key_count;
559 for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; pKeyValuePair++)
560 {
561 total_key_len += pKeyValuePair->key_len;
562 total_value_len += pKeyValuePair->value_len;
563 }
564 pkg_total_len = sizeof(FDHTProtoHeader) + 12 + pObjectInfo->namespace_len + \
565 pObjectInfo->obj_id_len + 8 * key_count + \
566 total_key_len + total_value_len;
567
568 if (pkg_total_len <= sizeof(buff))
569 {
570 pBuff = buff;
571 }
572 else
573 {
574 pBuff = (char *)malloc(pkg_total_len);
575 if (pBuff == NULL)
576 {
577 result = errno != 0 ? errno : ENOMEM;
578 logError("malloc %d bytes fail, " \
579 "errno: %d, error info: %s", \
580 pkg_total_len, result, STRERROR(result));
581 return result;
582 }
583 }
584
585 memset(pBuff, 0, pkg_total_len);
586 pHeader = (FDHTProtoHeader *)pBuff;
587
588 pHeader->cmd = FDHT_PROTO_CMD_BATCH_SET;
589 pHeader->keep_alive = bKeepAlive;
590 int2buff((int)time(NULL), pHeader->timestamp);
591 int2buff((int)expires, pHeader->expires);
592 int2buff(key_hash_code, pHeader->key_hash_code);
593
594 p = pBuff + sizeof(FDHTProtoHeader);
595 PACK_BODY_OBJECT(pObjectInfo, p)
596 int2buff(key_count, p);
597 p += 4;
598
599 for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; pKeyValuePair++)
600 {
601 int2buff(pKeyValuePair->key_len, p);
602 memcpy(p + 4, pKeyValuePair->szKey, pKeyValuePair->key_len);
603 p += 4 + pKeyValuePair->key_len;
604
605 int2buff(pKeyValuePair->value_len, p);
606 memcpy(p + 4, pKeyValuePair->pValue, pKeyValuePair->value_len);
607 p += 4 + pKeyValuePair->value_len;
608 }
609
610 do
611 {
612 int2buff(pkg_total_len - sizeof(FDHTProtoHeader), pHeader->pkg_len);
613 if ((result=tcpsenddata_nb(pServer->sock, pBuff, pkg_total_len, \
614 g_fdht_network_timeout)) != 0)
615 {
616 logError("send data to server %s:%d fail, " \
617 "errno: %d, error info: %s", \
618 pServer->ip_addr, pServer->port, \
619 result, STRERROR(result));
620 break;
621 }
622
623 if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
624 {
625 break;
626 }
627
628 if (in_bytes != 8 + 5 * key_count + total_key_len)
629 {
630 logError("server %s:%d reponse bytes: %d != %d", \
631 pServer->ip_addr, pServer->port, in_bytes, \
632 8 + 5 * key_count + total_key_len);
633 result = EINVAL;
634 break;
635 }
636
637 if ((result=tcprecvdata_nb(pServer->sock, pBuff, \
638 in_bytes, g_fdht_network_timeout)) != 0)
639 {
640 logError("file: "__FILE__", line: %d, " \
641 "server: %s:%d, recv data fail, " \
642 "errno: %d, error info: %s", \
643 __LINE__, pServer->ip_addr, pServer->port, \
644 result, STRERROR(result));
645 break;
646 }
647
648 if (buff2int(pBuff) != key_count)
649 {
650 result = EINVAL;
651 logError("file: "__FILE__", line: %d, " \
652 "server: %s:%d, invalid key_count: %d, " \
653 "expect key count: %d", \
654 __LINE__, pServer->ip_addr, pServer->port, \
655 buff2int(pBuff), key_count);
656 break;
657 }
658
659 *success_count = buff2int(pBuff + 4);
660 p = pBuff + 8;
661 for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; \
662 pKeyValuePair++)
663 {
664 pKeyValuePair->key_len = buff2int(p);
665
666 memcpy(pKeyValuePair->szKey, p + 4, \
667 pKeyValuePair->key_len);
668 p += 4 + pKeyValuePair->key_len;
669 pKeyValuePair->status = *p++;
670 }
671 } while (0);
672
673 if (pBuff != buff)
674 {
675 free(pBuff);
676 }
677
678 if (bKeepAlive)
679 {
680 if (result >= ENETDOWN) //network error
681 {
682 fdht_disconnect_server(pServer);
683 if (result == ENOTCONN)
684 {
685 continue; //retry
686 }
687 }
688 }
689 else
690 {
691 fdht_disconnect_server(pServer);
692 }
693 break;
694 }
695
696 return result;
697 }
698
fdht_batch_delete_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTObjectInfo * pObjectInfo,FDHTKeyValuePair * key_list,const int key_count,int * success_count)699 int fdht_batch_delete_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
700 FDHTObjectInfo *pObjectInfo, FDHTKeyValuePair *key_list, \
701 const int key_count, int *success_count)
702 {
703 int result;
704 FDHTProtoHeader *pHeader;
705 char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
706 char buff[sizeof(FDHTProtoHeader) + FDHT_MAX_FULL_KEY_LEN + 8 + \
707 (5 + FDHT_MAX_SUB_KEY_LEN) * FDHT_MAX_KEY_COUNT_PER_REQ];
708 int in_bytes;
709 int total_key_len;
710 int group_id;
711 int hash_key_len;
712 int key_hash_code;
713 int i;
714 ServerArray *pGroup;
715 FDHTServerInfo *pServer;
716 FDHTKeyValuePair *pKeyValuePair;
717 FDHTKeyValuePair *pKeyValueEnd;
718 char *p;
719
720 *success_count = 0;
721 if (key_count <= 0 || key_count > FDHT_MAX_KEY_COUNT_PER_REQ)
722 {
723 logError("invalid key_count: %d", key_count);
724 return EINVAL;
725 }
726
727 CALC_OBJECT_HASH_CODE(pObjectInfo, hash_key, hash_key_len, key_hash_code)
728 group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
729 pGroup = pGroupArray->groups + group_id;
730 for (i=0; i<=pGroup->count; i++)
731 {
732 pServer = get_readable_connection(pGroup, bKeepAlive, \
733 key_hash_code, &result);
734 if (pServer == NULL)
735 {
736 return result;
737 }
738
739 memset(buff, 0, sizeof(buff));
740 pHeader = (FDHTProtoHeader *)buff;
741
742 pHeader->cmd = FDHT_PROTO_CMD_BATCH_DEL;
743 pHeader->keep_alive = bKeepAlive;
744 int2buff((int)time(NULL), pHeader->timestamp);
745 int2buff(key_hash_code, pHeader->key_hash_code);
746
747 p = buff + sizeof(FDHTProtoHeader);
748 PACK_BODY_OBJECT(pObjectInfo, p)
749 int2buff(key_count, p);
750 p += 4;
751
752 total_key_len = 0;
753 pKeyValueEnd = key_list + key_count;
754 for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; pKeyValuePair++)
755 {
756 int2buff(pKeyValuePair->key_len, p);
757 memcpy(p + 4, pKeyValuePair->szKey, pKeyValuePair->key_len);
758 p += 4 + pKeyValuePair->key_len;
759
760 total_key_len += pKeyValuePair->key_len;
761 }
762
763 do
764 {
765 int2buff((p - buff) - sizeof(FDHTProtoHeader), pHeader->pkg_len);
766 if ((result=tcpsenddata_nb(pServer->sock, buff, p - buff, \
767 g_fdht_network_timeout)) != 0)
768 {
769 logError("send data to server %s:%d fail, " \
770 "errno: %d, error info: %s", \
771 pServer->ip_addr, pServer->port, \
772 result, STRERROR(result));
773 break;
774 }
775
776 if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
777 {
778 break;
779 }
780
781 if (in_bytes != 8 + 5 * key_count + total_key_len)
782 {
783 logError("server %s:%d reponse bytes: %d != %d", \
784 pServer->ip_addr, pServer->port, in_bytes, \
785 8 + 5 * key_count + total_key_len);
786 result = EINVAL;
787 break;
788 }
789
790 if ((result=tcprecvdata_nb(pServer->sock, buff, \
791 in_bytes, g_fdht_network_timeout)) != 0)
792 {
793 logError("file: "__FILE__", line: %d, " \
794 "server: %s:%d, recv data fail, " \
795 "errno: %d, error info: %s", \
796 __LINE__, pServer->ip_addr, pServer->port, \
797 result, STRERROR(result));
798 break;
799 }
800
801 if (buff2int(buff) != key_count)
802 {
803 result = EINVAL;
804 logError("file: "__FILE__", line: %d, " \
805 "server: %s:%d, invalid key_count: %d, " \
806 "expect key count: %d", \
807 __LINE__, pServer->ip_addr, pServer->port, \
808 buff2int(buff), key_count);
809 break;
810 }
811
812 *success_count = buff2int(buff + 4);
813 p = buff + 8;
814 for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; \
815 pKeyValuePair++)
816 {
817 pKeyValuePair->key_len = buff2int(p);
818
819 memcpy(pKeyValuePair->szKey, p + 4, \
820 pKeyValuePair->key_len);
821 p += 4 + pKeyValuePair->key_len;
822 pKeyValuePair->status = *p++;
823 }
824 } while (0);
825
826 if (bKeepAlive)
827 {
828 if (result >= ENETDOWN) //network error
829 {
830 fdht_disconnect_server(pServer);
831 if (result == ENOTCONN)
832 {
833 continue; //retry
834 }
835 }
836 }
837 else
838 {
839 fdht_disconnect_server(pServer);
840 }
841
842 break;
843 }
844
845 return result;
846 }
847
fdht_batch_get_ex1(GroupArray * pGroupArray,const bool bKeepAlive,FDHTObjectInfo * pObjectInfo,FDHTKeyValuePair * key_list,const int key_count,const time_t expires,MallocFunc malloc_func,int * success_count)848 int fdht_batch_get_ex1(GroupArray *pGroupArray, const bool bKeepAlive, \
849 FDHTObjectInfo *pObjectInfo, FDHTKeyValuePair *key_list, \
850 const int key_count, const time_t expires, \
851 MallocFunc malloc_func, int *success_count)
852 {
853 int result;
854 FDHTProtoHeader *pHeader;
855 char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
856 char buff[sizeof(FDHTProtoHeader) + FDHT_MAX_FULL_KEY_LEN + \
857 (4 + FDHT_MAX_SUB_KEY_LEN) * FDHT_MAX_KEY_COUNT_PER_REQ + \
858 32 * 1024];
859 int in_bytes;
860 int value_len;
861 int group_id;
862 int hash_key_len;
863 int key_hash_code;
864 char *pInBuff;
865 int i;
866 ServerArray *pGroup;
867 FDHTServerInfo *pServer;
868 FDHTKeyValuePair *pKeyValuePair;
869 FDHTKeyValuePair *pKeyValueEnd;
870 char *p;
871
872 *success_count = 0;
873 if (key_count <= 0 || key_count > FDHT_MAX_KEY_COUNT_PER_REQ)
874 {
875 logError("invalid key_count: %d", key_count);
876 return EINVAL;
877 }
878
879 CALC_OBJECT_HASH_CODE(pObjectInfo, hash_key, hash_key_len, key_hash_code)
880 group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
881 pGroup = pGroupArray->groups + group_id;
882 for (i=0; i<=pGroup->count; i++)
883 {
884 pServer = get_readable_connection(pGroup, bKeepAlive, \
885 key_hash_code, &result);
886 if (pServer == NULL)
887 {
888 return result;
889 }
890
891 memset(buff, 0, sizeof(buff));
892 pHeader = (FDHTProtoHeader *)buff;
893
894 pHeader->cmd = FDHT_PROTO_CMD_BATCH_GET;
895 pHeader->keep_alive = bKeepAlive;
896 int2buff((int)time(NULL), pHeader->timestamp);
897 int2buff((int)expires, pHeader->expires);
898 int2buff(key_hash_code, pHeader->key_hash_code);
899
900 p = buff + sizeof(FDHTProtoHeader);
901 PACK_BODY_OBJECT(pObjectInfo, p)
902 int2buff(key_count, p);
903 p += 4;
904
905 pKeyValueEnd = key_list + key_count;
906 for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; pKeyValuePair++)
907 {
908 int2buff(pKeyValuePair->key_len, p);
909 memcpy(p + 4, pKeyValuePair->szKey, pKeyValuePair->key_len);
910 p += 4 + pKeyValuePair->key_len;
911 }
912
913 pInBuff = buff;
914 do
915 {
916 int2buff((p - buff) - sizeof(FDHTProtoHeader), pHeader->pkg_len);
917 if ((result=tcpsenddata_nb(pServer->sock, buff, p - buff, \
918 g_fdht_network_timeout)) != 0)
919 {
920 logError("send data to server %s:%d fail, " \
921 "errno: %d, error info: %s", \
922 pServer->ip_addr, pServer->port, \
923 result, STRERROR(result));
924 break;
925 }
926
927 if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
928 {
929 break;
930 }
931
932 if (in_bytes < 17)
933 {
934 logError("server %s:%d reponse bytes: %d < 17", \
935 pServer->ip_addr, pServer->port, in_bytes);
936 result = EINVAL;
937 break;
938 }
939
940 if (in_bytes > sizeof(buff))
941 {
942 pInBuff = (char *)malloc(in_bytes);
943 if (pInBuff == NULL)
944 {
945 result = errno != 0 ? errno : ENOMEM;
946 logError("file: "__FILE__", line: %d, " \
947 "malloc %d bytes fail, " \
948 "errno: %d, error info: %s", \
949 __LINE__, in_bytes, \
950 result, STRERROR(result));
951 break;
952 }
953 }
954
955 if ((result=tcprecvdata_nb(pServer->sock, pInBuff, \
956 in_bytes, g_fdht_network_timeout)) != 0)
957 {
958 logError("file: "__FILE__", line: %d, " \
959 "server: %s:%d, recv data fail, " \
960 "errno: %d, error info: %s", \
961 __LINE__, pServer->ip_addr, pServer->port, \
962 result, STRERROR(result));
963 break;
964 }
965
966 if (buff2int(pInBuff) != key_count)
967 {
968 result = EINVAL;
969 logError("file: "__FILE__", line: %d, " \
970 "server: %s:%d, invalid key_count: %d, " \
971 "expect key count: %d", \
972 __LINE__, pServer->ip_addr, pServer->port, \
973 buff2int(pInBuff), key_count);
974 break;
975 }
976
977 *success_count = buff2int(pInBuff + 4);
978 p = pInBuff + 8;
979 for (pKeyValuePair=key_list; pKeyValuePair<pKeyValueEnd; \
980 pKeyValuePair++)
981 {
982 pKeyValuePair->key_len = buff2int(p);
983
984 memcpy(pKeyValuePair->szKey, p + 4, \
985 pKeyValuePair->key_len);
986 p += 4 + pKeyValuePair->key_len;
987 pKeyValuePair->status = *p++;
988 if (pKeyValuePair->status != 0)
989 {
990 pKeyValuePair->value_len = 0;
991 continue;
992 }
993
994 value_len = buff2int(p);
995 p += 4;
996 if (pKeyValuePair->pValue != NULL)
997 {
998 if (value_len >= pKeyValuePair->value_len)
999 {
1000 *(pKeyValuePair->pValue) = '\0';
1001 pKeyValuePair->value_len = 0;
1002 pKeyValuePair->status = ENOSPC;
1003 }
1004 else
1005 {
1006 pKeyValuePair->value_len = value_len;
1007 memcpy(pKeyValuePair->pValue, p, \
1008 value_len);
1009 *(pKeyValuePair->pValue+value_len)='\0';
1010 }
1011 }
1012 else
1013 {
1014 pKeyValuePair->pValue = (char *)malloc_func( \
1015 value_len + 1);
1016 if (pKeyValuePair->pValue == NULL)
1017 {
1018 pKeyValuePair->value_len = 0;
1019 pKeyValuePair->status = errno != 0 ? \
1020 errno : ENOMEM;
1021 logError("malloc %d bytes fail, " \
1022 "errno: %d, error info: %s", \
1023 value_len+1, errno, \
1024 STRERROR(errno));
1025 }
1026 else
1027 {
1028 pKeyValuePair->value_len = value_len;
1029 memcpy(pKeyValuePair->pValue, p, \
1030 value_len);
1031 *(pKeyValuePair->pValue+value_len)='\0';
1032 }
1033 }
1034
1035 p += value_len;
1036 }
1037
1038 if (in_bytes != p - pInBuff)
1039 {
1040 *success_count = 0;
1041 logError("server %s:%d reponse bytes: %d != %d", \
1042 pServer->ip_addr, pServer->port, \
1043 in_bytes, (int)(p - pInBuff));
1044 result = EINVAL;
1045 break;
1046 }
1047 } while (0);
1048
1049 if (pInBuff != buff)
1050 {
1051 free(pInBuff);
1052 }
1053
1054 if (bKeepAlive)
1055 {
1056 if (result >= ENETDOWN) //network error
1057 {
1058 fdht_disconnect_server(pServer);
1059 if (result == ENOTCONN)
1060 {
1061 continue; //retry
1062 }
1063 }
1064 }
1065 else
1066 {
1067 fdht_disconnect_server(pServer);
1068 }
1069
1070 break;
1071 }
1072
1073 return result;
1074 }
1075
fdht_set_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTKeyInfo * pKeyInfo,const time_t expires,const char * pValue,const int value_len)1076 int fdht_set_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
1077 FDHTKeyInfo *pKeyInfo, const time_t expires, \
1078 const char *pValue, const int value_len)
1079 {
1080 int result;
1081 char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
1082 int group_id;
1083 int hash_key_len;
1084 int key_hash_code;
1085 int i;
1086 ServerArray *pGroup;
1087 FDHTServerInfo *pServer;
1088
1089 CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code)
1090 group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
1091
1092 pGroup = pGroupArray->groups + group_id;
1093 for (i=0; i<=pGroup->count; i++)
1094 {
1095 pServer = get_writable_connection(pGroup, bKeepAlive, \
1096 key_hash_code, &result);
1097 if (pServer == NULL)
1098 {
1099 return result;
1100 }
1101
1102 //printf("key_hash_code=%d, group_id=%d\n", key_hash_code, group_id);
1103
1104 //printf("set group_id=%d\n", group_id);
1105 result = fdht_client_set(pServer, bKeepAlive, time(NULL), expires, \
1106 FDHT_PROTO_CMD_SET, key_hash_code, \
1107 pKeyInfo, pValue, value_len);
1108
1109 if (bKeepAlive)
1110 {
1111 if (result >= ENETDOWN) //network error
1112 {
1113 fdht_disconnect_server(pServer);
1114 if (result == ENOTCONN)
1115 {
1116 continue; //retry
1117 }
1118 }
1119 }
1120 else
1121 {
1122 fdht_disconnect_server(pServer);
1123 }
1124
1125 break;
1126 }
1127
1128 return result;
1129 }
1130
1131 /**
1132 * request body format:
1133 * namespace_len: 4 bytes big endian integer
1134 * namespace: can be emtpy
1135 * obj_id_len: 4 bytes big endian integer
1136 * object_id: the object id (can be empty)
1137 * key_len: 4 bytes big endian integer
1138 * key: key name
1139 * incr 4 bytes big endian integer
1140 * response body format:
1141 * value_len: 4 bytes big endian integer
1142 * value : value_len bytes
1143 */
fdht_inc_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTKeyInfo * pKeyInfo,const time_t expires,const int increase,char * pValue,int * value_len)1144 int fdht_inc_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
1145 FDHTKeyInfo *pKeyInfo, const time_t expires, \
1146 const int increase, char *pValue, int *value_len)
1147 {
1148 int result;
1149 FDHTProtoHeader *pHeader;
1150 char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
1151 char buff[FDHT_MAX_FULL_KEY_LEN + 32];
1152 char *in_buff;
1153 int in_bytes;
1154 int group_id;
1155 int hash_key_len;
1156 int key_hash_code;
1157 int i;
1158 ServerArray *pGroup;
1159 FDHTServerInfo *pServer;
1160 char *p;
1161
1162 CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code)
1163 group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
1164 pGroup = pGroupArray->groups + group_id;
1165 for (i=0; i<=pGroup->count; i++)
1166 {
1167 pServer = get_writable_connection(pGroup, bKeepAlive, \
1168 key_hash_code, &result);
1169 if (pServer == NULL)
1170 {
1171 return result;
1172 }
1173
1174 //printf("inc group_id=%d\n", group_id);
1175
1176 memset(buff, 0, sizeof(buff));
1177 pHeader = (FDHTProtoHeader *)buff;
1178
1179 pHeader->cmd = FDHT_PROTO_CMD_INC;
1180 pHeader->keep_alive = bKeepAlive;
1181 int2buff((int)time(NULL), pHeader->timestamp);
1182 int2buff((int)expires, pHeader->expires);
1183 int2buff(key_hash_code, pHeader->key_hash_code);
1184 int2buff(16 + pKeyInfo->namespace_len + pKeyInfo->obj_id_len + \
1185 pKeyInfo->key_len, pHeader->pkg_len);
1186
1187 while (1)
1188 {
1189 p = buff + sizeof(FDHTProtoHeader);
1190 PACK_BODY_UNTIL_KEY(pKeyInfo, p)
1191 int2buff(increase, p);
1192 p += 4;
1193 if ((result=tcpsenddata_nb(pServer->sock, buff, p - buff, \
1194 g_fdht_network_timeout)) != 0)
1195 {
1196 logError("send data to server %s:%d fail, " \
1197 "errno: %d, error info: %s", \
1198 pServer->ip_addr, pServer->port, \
1199 result, STRERROR(result));
1200 break;
1201 }
1202
1203 in_buff = buff;
1204 if ((result=fdht_recv_response(pServer, &in_buff, \
1205 sizeof(buff), &in_bytes)) != 0)
1206 {
1207 logError("recv data from server %s:%d fail, " \
1208 "errno: %d, error info: %s", \
1209 pServer->ip_addr, pServer->port, \
1210 result, STRERROR(result));
1211 break;
1212 }
1213
1214 if (in_bytes < 4)
1215 {
1216 logError("server %s:%d reponse bytes: %d < 4!", \
1217 pServer->ip_addr, pServer->port, in_bytes);
1218 result = EINVAL;
1219 break;
1220 }
1221
1222 if (in_bytes - 4 >= *value_len)
1223 {
1224 *value_len = 0;
1225 result = ENOSPC;
1226 break;
1227 }
1228
1229 *value_len = in_bytes - 4;
1230 memcpy(pValue, in_buff + 4, *value_len);
1231 *(pValue + (*value_len)) = '\0';
1232 break;
1233 }
1234
1235 if (bKeepAlive)
1236 {
1237 if (result >= ENETDOWN) //network error
1238 {
1239 fdht_disconnect_server(pServer);
1240 if (result == ENOTCONN)
1241 {
1242 continue; //retry
1243 }
1244 }
1245 }
1246 else
1247 {
1248 fdht_disconnect_server(pServer);
1249 }
1250
1251 break;
1252 }
1253
1254 return result;
1255 }
1256
fdht_delete_ex(GroupArray * pGroupArray,const bool bKeepAlive,FDHTKeyInfo * pKeyInfo)1257 int fdht_delete_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
1258 FDHTKeyInfo *pKeyInfo)
1259 {
1260 int result;
1261 char hash_key[FDHT_MAX_FULL_KEY_LEN + 1];
1262 int group_id;
1263 int hash_key_len;
1264 int key_hash_code;
1265 int i;
1266 ServerArray *pGroup;
1267 FDHTServerInfo *pServer;
1268
1269 CALC_KEY_HASH_CODE(pKeyInfo, hash_key, hash_key_len, key_hash_code)
1270 group_id = ((unsigned int)key_hash_code) % pGroupArray->group_count;
1271 pGroup = pGroupArray->groups + group_id;
1272 for (i=0; i<=pGroup->count; i++)
1273 {
1274 pServer = get_writable_connection(pGroup, bKeepAlive, \
1275 key_hash_code , &result);
1276 if (pServer == NULL)
1277 {
1278 return result;
1279 }
1280
1281 //printf("del group_id=%d\n", group_id);
1282 result = fdht_client_delete(pServer, bKeepAlive, time(NULL), \
1283 FDHT_PROTO_CMD_DEL, key_hash_code, pKeyInfo);
1284
1285 if (bKeepAlive)
1286 {
1287 if (result >= ENETDOWN) //network error
1288 {
1289 fdht_disconnect_server(pServer);
1290 if (result == ENOTCONN)
1291 {
1292 continue; //retry
1293 }
1294 }
1295 }
1296 else
1297 {
1298 fdht_disconnect_server(pServer);
1299 }
1300
1301 break;
1302 }
1303
1304 return result;
1305 }
1306
fdht_connect_all_servers(GroupArray * pGroupArray,const bool bKeepAlive,int * success_count,int * fail_count)1307 int fdht_connect_all_servers(GroupArray *pGroupArray, const bool bKeepAlive, \
1308 int *success_count, int *fail_count)
1309 {
1310 FDHTServerInfo *pServerInfo;
1311 FDHTServerInfo *pServerEnd;
1312 int conn_result;
1313 int result;
1314
1315 *success_count = 0;
1316 *fail_count = 0;
1317 if (pGroupArray->servers == NULL)
1318 {
1319 return ENOENT;
1320 }
1321
1322 result = 0;
1323
1324 pServerEnd = pGroupArray->servers + pGroupArray->server_count;
1325 for (pServerInfo=pGroupArray->servers; \
1326 pServerInfo<pServerEnd; pServerInfo++)
1327 {
1328 if ((conn_result=fdht_connect_server_nb(pServerInfo, \
1329 g_fdht_connect_timeout)) != 0)
1330 {
1331 result = conn_result;
1332 (*fail_count)++;
1333 }
1334 else //connect success
1335 {
1336 (*success_count)++;
1337 if (bKeepAlive || pGroupArray->use_proxy)
1338 {
1339 tcpsetnodelay(pServerInfo->sock, 3600);
1340 }
1341 }
1342 }
1343
1344 if (result != 0)
1345 {
1346 return result;
1347 }
1348 else
1349 {
1350 return *success_count > 0 ? 0: ENOENT;
1351 }
1352 }
1353
fdht_disconnect_all_servers(GroupArray * pGroupArray)1354 void fdht_disconnect_all_servers(GroupArray *pGroupArray)
1355 {
1356 FDHTServerInfo *pServerInfo;
1357 FDHTServerInfo *pServerEnd;
1358
1359 if (pGroupArray->servers != NULL)
1360 {
1361 pServerEnd = pGroupArray->servers + pGroupArray->server_count;
1362 for (pServerInfo=pGroupArray->servers; \
1363 pServerInfo<pServerEnd; pServerInfo++)
1364 {
1365 if (pServerInfo->sock >= 0)
1366 {
1367 if (!pGroupArray->use_proxy)
1368 {
1369 fdht_quit(pServerInfo);
1370 }
1371 close(pServerInfo->sock);
1372 pServerInfo->sock = -1;
1373 }
1374 }
1375 }
1376 }
1377
fdht_stat_ex(GroupArray * pGroupArray,const bool bKeepAlive,const int server_index,char * buff,const int size)1378 int fdht_stat_ex(GroupArray *pGroupArray, const bool bKeepAlive, \
1379 const int server_index, char *buff, const int size)
1380 {
1381 int result;
1382 int in_bytes;
1383 int i;
1384 FDHTProtoHeader header;
1385 FDHTServerInfo *pServer;
1386
1387 memset(buff, 0, size);
1388 if (server_index < 0 || server_index > pGroupArray->server_count)
1389 {
1390 logError("invalid servier_index: %d", server_index);
1391 return EINVAL;
1392 }
1393
1394 pServer = pGroupArray->servers + server_index;
1395 for (i=0; i<2; i++)
1396 {
1397 if ((result=fdht_connect_server_nb(pServer, \
1398 g_fdht_connect_timeout)) != 0)
1399 {
1400 return result;
1401 }
1402
1403 if (bKeepAlive)
1404 {
1405 tcpsetnodelay(pServer->sock, 3600);
1406 }
1407
1408 memset(&header, 0, sizeof(header));
1409 header.cmd = FDHT_PROTO_CMD_STAT;
1410 header.keep_alive = bKeepAlive;
1411 int2buff((int)time(NULL), header.timestamp);
1412
1413 do
1414 {
1415 if ((result=tcpsenddata_nb(pServer->sock, &header, \
1416 sizeof(header), g_fdht_network_timeout)) != 0)
1417 {
1418 logError("send data to server %s:%d fail, " \
1419 "errno: %d, error info: %s", \
1420 pServer->ip_addr, pServer->port, \
1421 result, STRERROR(result));
1422 break;
1423 }
1424
1425 if ((result=fdht_recv_header(pServer, &in_bytes)) != 0)
1426 {
1427 break;
1428 }
1429
1430 if (in_bytes >= size)
1431 {
1432 logError("server %s:%d reponse bytes: %d >= " \
1433 "buff size: %d", pServer->ip_addr, \
1434 pServer->port, in_bytes, size);
1435 result = ENOSPC;
1436 break;
1437 }
1438
1439 if ((result=tcprecvdata_nb(pServer->sock, buff, \
1440 in_bytes, g_fdht_network_timeout)) != 0)
1441 {
1442 logError("file: "__FILE__", line: %d, " \
1443 "server: %s:%d, recv data fail, " \
1444 "errno: %d, error info: %s", \
1445 __LINE__, pServer->ip_addr, \
1446 pServer->port, \
1447 result, STRERROR(result));
1448 break;
1449 }
1450 } while (0);
1451
1452 if (bKeepAlive)
1453 {
1454 if (result >= ENETDOWN) //network error
1455 {
1456 fdht_disconnect_server(pServer);
1457 if (result == ENOTCONN)
1458 {
1459 continue; //retry
1460 }
1461 }
1462 }
1463 else
1464 {
1465 fdht_disconnect_server(pServer);
1466 }
1467
1468 break;
1469 }
1470
1471 return result;
1472 }
1473
1474