1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 #if !defined(DLL_EXPORT) && !defined(USE_STATIC_LIB)
20 # define USE_STATIC_LIB
21 #endif
22
23 #if defined(__CYGWIN__)
24 #define USE_IPV6
25 #endif
26
27 #include "config.h"
28 #include <zookeeper.h>
29 #include <zookeeper.jute.h>
30 #include <proto.h>
31 #include "zk_adaptor.h"
32 #include "zookeeper_log.h"
33 #include "zk_hashtable.h"
34
35 #ifdef HAVE_CYRUS_SASL_H
36 #include "zk_sasl.h"
37 #endif /* HAVE_CYRUS_SASL_H */
38
39 #include <stdlib.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <time.h>
43 #include <errno.h>
44 #include <fcntl.h>
45 #include <assert.h>
46 #include <stdarg.h>
47 #include <limits.h>
48
49 #ifdef HAVE_SYS_TIME_H
50 #include <sys/time.h>
51 #endif
52
53 #ifdef HAVE_SYS_SOCKET_H
54 #include <sys/socket.h>
55 #endif
56
57 #ifdef HAVE_POLL
58 #include <poll.h>
59 #endif
60
61 #ifdef HAVE_NETINET_IN_H
62 #include <netinet/in.h>
63 #include <netinet/tcp.h>
64 #endif
65
66 #ifdef HAVE_ARPA_INET_H
67 #include <arpa/inet.h>
68 #endif
69
70 #ifdef HAVE_NETDB_H
71 #include <netdb.h>
72 #endif
73
74 #ifdef HAVE_UNISTD_H
75 #include <unistd.h> // needed for _POSIX_MONOTONIC_CLOCK
76 #endif
77
78 #ifdef HAVE_SYS_UTSNAME_H
79 #include <sys/utsname.h>
80 #endif
81
82 #ifdef HAVE_GETPWUID_R
83 #include <pwd.h>
84 #endif
85
86 #ifdef HAVE_OPENSSL_H
87 #include <openssl/ssl.h>
88 #include <openssl/err.h>
89 #endif
90
91 #ifdef __MACH__ // OS X
92 #include <mach/clock.h>
93 #include <mach/mach.h>
94 #include <netinet/tcp.h>
95 #endif
96
97 #ifdef WIN32
98 #include <process.h> /* for getpid */
99 #include <direct.h> /* for getcwd */
100 #define EAI_ADDRFAMILY WSAEINVAL /* is this still needed? */
101 #define EHOSTDOWN EPIPE
102 #define ESTALE ENODEV
103 #endif
104
105 #define IF_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) {x;}
106
107 const int ZOOKEEPER_WRITE = 1 << 0;
108 const int ZOOKEEPER_READ = 1 << 1;
109
110 const int ZOO_PERSISTENT = 0;
111 const int ZOO_EPHEMERAL = 1;
112 const int ZOO_PERSISTENT_SEQUENTIAL = 2;
113 const int ZOO_EPHEMERAL_SEQUENTIAL = 3;
114 const int ZOO_CONTAINER = 4;
115 const int ZOO_PERSISTENT_WITH_TTL = 5;
116 const int ZOO_PERSISTENT_SEQUENTIAL_WITH_TTL = 6;
117
118 #define ZOOKEEPER_IS_SEQUENCE(mode) \
119 ((mode) == ZOO_PERSISTENT_SEQUENTIAL || \
120 (mode) == ZOO_EPHEMERAL_SEQUENTIAL || \
121 (mode) == ZOO_PERSISTENT_SEQUENTIAL_WITH_TTL)
122 #define ZOOKEEPER_IS_TTL(mode) \
123 ((mode) == ZOO_PERSISTENT_WITH_TTL || \
124 (mode) == ZOO_PERSISTENT_SEQUENTIAL_WITH_TTL)
125
126 // keep ZOO_SEQUENCE as a bitmask for compatibility reasons
127 const int ZOO_SEQUENCE = 1 << 1;
128
129 #define ZOO_MAX_TTL 0xFFFFFFFFFFLL
130
131 const int ZOO_EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
132 const int ZOO_AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
133 const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
134 const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
135 const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
136 const int ZOO_READONLY_STATE = READONLY_STATE_DEF;
137 const int ZOO_SSL_CONNECTING_STATE = SSL_CONNECTING_STATE_DEF;
138 const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
139
state2String(int state)140 static __attribute__ ((unused)) const char* state2String(int state){
141 switch(state){
142 case 0:
143 return "ZOO_CLOSED_STATE";
144 case CONNECTING_STATE_DEF:
145 return "ZOO_CONNECTING_STATE";
146 case SSL_CONNECTING_STATE_DEF:
147 return "ZOO_SSL_CONNECTING_STATE";
148 case ASSOCIATING_STATE_DEF:
149 return "ZOO_ASSOCIATING_STATE";
150 case CONNECTED_STATE_DEF:
151 return "ZOO_CONNECTED_STATE";
152 case READONLY_STATE_DEF:
153 return "ZOO_READONLY_STATE";
154 case EXPIRED_SESSION_STATE_DEF:
155 return "ZOO_EXPIRED_SESSION_STATE";
156 case AUTH_FAILED_STATE_DEF:
157 return "ZOO_AUTH_FAILED_STATE";
158 }
159 return "INVALID_STATE";
160 }
161
162 const int ZOO_CREATED_EVENT = CREATED_EVENT_DEF;
163 const int ZOO_DELETED_EVENT = DELETED_EVENT_DEF;
164 const int ZOO_CHANGED_EVENT = CHANGED_EVENT_DEF;
165 const int ZOO_CHILD_EVENT = CHILD_EVENT_DEF;
166 const int ZOO_SESSION_EVENT = SESSION_EVENT_DEF;
167 const int ZOO_NOTWATCHING_EVENT = NOTWATCHING_EVENT_DEF;
watcherEvent2String(int ev)168 static __attribute__ ((unused)) const char* watcherEvent2String(int ev){
169 switch(ev){
170 case 0:
171 return "ZOO_ERROR_EVENT";
172 case CREATED_EVENT_DEF:
173 return "ZOO_CREATED_EVENT";
174 case DELETED_EVENT_DEF:
175 return "ZOO_DELETED_EVENT";
176 case CHANGED_EVENT_DEF:
177 return "ZOO_CHANGED_EVENT";
178 case CHILD_EVENT_DEF:
179 return "ZOO_CHILD_EVENT";
180 case SESSION_EVENT_DEF:
181 return "ZOO_SESSION_EVENT";
182 case NOTWATCHING_EVENT_DEF:
183 return "ZOO_NOTWATCHING_EVENT";
184 }
185 return "INVALID_EVENT";
186 }
187
188 const int ZOO_PERM_READ = 1 << 0;
189 const int ZOO_PERM_WRITE = 1 << 1;
190 const int ZOO_PERM_CREATE = 1 << 2;
191 const int ZOO_PERM_DELETE = 1 << 3;
192 const int ZOO_PERM_ADMIN = 1 << 4;
193 const int ZOO_PERM_ALL = 0x1f;
194 struct Id ZOO_ANYONE_ID_UNSAFE = {"world", "anyone"};
195 struct Id ZOO_AUTH_IDS = {"auth", ""};
196 static struct ACL _OPEN_ACL_UNSAFE_ACL[] = {{0x1f, {"world", "anyone"}}};
197 static struct ACL _READ_ACL_UNSAFE_ACL[] = {{0x01, {"world", "anyone"}}};
198 static struct ACL _CREATOR_ALL_ACL_ACL[] = {{0x1f, {"auth", ""}}};
199 struct ACL_vector ZOO_OPEN_ACL_UNSAFE = { 1, _OPEN_ACL_UNSAFE_ACL};
200 struct ACL_vector ZOO_READ_ACL_UNSAFE = { 1, _READ_ACL_UNSAFE_ACL};
201 struct ACL_vector ZOO_CREATOR_ALL_ACL = { 1, _CREATOR_ALL_ACL_ACL};
202
203 #define COMPLETION_WATCH -1
204 #define COMPLETION_VOID 0
205 #define COMPLETION_STAT 1
206 #define COMPLETION_DATA 2
207 #define COMPLETION_STRINGLIST 3
208 #define COMPLETION_STRINGLIST_STAT 4
209 #define COMPLETION_ACLLIST 5
210 #define COMPLETION_STRING 6
211 #define COMPLETION_MULTI 7
212 #define COMPLETION_STRING_STAT 8
213
214 typedef struct _auth_completion_list {
215 void_completion_t completion;
216 const char *auth_data;
217 struct _auth_completion_list *next;
218 } auth_completion_list_t;
219
220 typedef struct completion {
221 int type; /* one of COMPLETION_* values above */
222 union {
223 void_completion_t void_result;
224 stat_completion_t stat_result;
225 data_completion_t data_result;
226 strings_completion_t strings_result;
227 strings_stat_completion_t strings_stat_result;
228 acl_completion_t acl_result;
229 string_completion_t string_result;
230 string_stat_completion_t string_stat_result;
231 struct watcher_object_list *watcher_result;
232 };
233 completion_head_t clist; /* For multi-op */
234 } completion_t;
235
236 typedef struct _completion_list {
237 int xid;
238 completion_t c;
239 const void *data;
240 buffer_list_t *buffer;
241 struct _completion_list *next;
242 watcher_registration_t* watcher;
243 watcher_deregistration_t* watcher_deregistration;
244 } completion_list_t;
245
246 const char*err2string(int err);
247 static inline int calculate_interval(const struct timeval *start,
248 const struct timeval *end);
249 static int queue_session_event(zhandle_t *zh, int state);
250 static const char* format_endpoint_info(const struct sockaddr_storage* ep);
251
252 /* deserialize forward declarations */
253 static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
254 static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia);
255
256 /* completion routine forward declarations */
257 static int add_completion(zhandle_t *zh, int xid, int completion_type,
258 const void *dc, const void *data, int add_to_front,
259 watcher_registration_t* wo, completion_head_t *clist);
260 static int add_completion_deregistration(zhandle_t *zh, int xid,
261 int completion_type, const void *dc, const void *data,
262 int add_to_front, watcher_deregistration_t* wo,
263 completion_head_t *clist);
264 static int do_add_completion(zhandle_t *zh, const void *dc, completion_list_t *c,
265 int add_to_front);
266
267 static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
268 const void *dc, const void *data, watcher_registration_t* wo,
269 completion_head_t *clist);
270 static completion_list_t* create_completion_entry_deregistration(zhandle_t *zh,
271 int xid, int completion_type, const void *dc, const void *data,
272 watcher_deregistration_t* wo, completion_head_t *clist);
273 static completion_list_t* do_create_completion_entry(zhandle_t *zh,
274 int xid, int completion_type, const void *dc, const void *data,
275 watcher_registration_t* wo, completion_head_t *clist,
276 watcher_deregistration_t* wdo);
277 static void destroy_completion_entry(completion_list_t* c);
278 static void queue_completion_nolock(completion_head_t *list, completion_list_t *c,
279 int add_to_front);
280 static void queue_completion(completion_head_t *list, completion_list_t *c,
281 int add_to_front);
282 static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
283 const char* format,...);
284 static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
285
286 static int disable_conn_permute=0; // permute enabled by default
287 static struct sockaddr_storage *addr_rw_server = 0;
288
289 static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
290 static int isValidPath(const char* path, const int mode);
291 #ifdef HAVE_OPENSSL_H
292 static int init_ssl_for_handler(zhandle_t *zh);
293 static int init_ssl_for_socket(zsock_t *fd, zhandle_t *zh, int fail_on_error);
294 #endif
295
296 static int aremove_watches(
297 zhandle_t *zh, const char *path, ZooWatcherType wtype,
298 watcher_fn watcher, void *watcherCtx, int local,
299 void_completion_t *completion, const void *data, int all);
300
301 #ifdef THREADED
302 static void process_sync_completion(zhandle_t *zh,
303 completion_list_t *cptr,
304 struct sync_completion *sc,
305 struct iarchive *ia);
306
307 static int remove_watches(
308 zhandle_t *zh, const char *path, ZooWatcherType wtype,
309 watcher_fn watcher, void *watcherCtx, int local, int all);
310 #endif
311
312 #ifdef _WIN32
313 typedef SOCKET socket_t;
314 typedef int sendsize_t;
315 #define SEND_FLAGS 0
316 #else
317 #ifdef __APPLE__
318 #define SEND_FLAGS SO_NOSIGPIPE
319 #endif
320 #ifdef __linux__
321 #define SEND_FLAGS MSG_NOSIGNAL
322 #endif
323 #ifndef SEND_FLAGS
324 #define SEND_FLAGS 0
325 #endif
326 typedef int socket_t;
327 typedef ssize_t sendsize_t;
328 #endif
329
330 static void zookeeper_set_sock_nodelay(zhandle_t *, socket_t);
331 static void zookeeper_set_sock_noblock(zhandle_t *, socket_t);
332 static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
333 static socket_t zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
334
335 /*
336 * return 1 if zh has a SASL client configured, 0 otherwise.
337 */
has_sasl_client(zhandle_t * zh)338 static int has_sasl_client(zhandle_t* zh)
339 {
340 #ifdef HAVE_CYRUS_SASL_H
341 return zh->sasl_client != NULL;
342 #else /* !HAVE_CYRUS_SASL_H */
343 return 0;
344 #endif /* HAVE_CYRUS_SASL_H */
345 }
346
347 /*
348 * return 1 if zh has a SASL client performing authentication, 0 otherwise.
349 */
is_sasl_auth_in_progress(zhandle_t * zh)350 static int is_sasl_auth_in_progress(zhandle_t* zh)
351 {
352 #ifdef HAVE_CYRUS_SASL_H
353 return zh->sasl_client && zh->sasl_client->state == ZOO_SASL_INTERMEDIATE;
354 #else /* !HAVE_CYRUS_SASL_H */
355 return 0;
356 #endif /* HAVE_CYRUS_SASL_H */
357 }
358
359 /*
360 * Extract the type field (ZOO_*_OP) of a serialized RequestHeader.
361 *
362 * (This is not the most efficient way of fetching 4 bytes, but it is
363 * currently only used during SASL negotiation.)
364 *
365 * \param buffer the buffer to extract the request type from. Must
366 * start with a serialized RequestHeader;
367 * \param len the buffer length. Must be positive.
368 * \param out_type out parameter; pointer to the location where the
369 * extracted type is to be stored. Cannot be NULL.
370 * \return ZOK on success, or < 0 if something went wrong
371 */
extract_request_type(char * buffer,int len,int32_t * out_type)372 static int extract_request_type(char *buffer, int len, int32_t *out_type)
373 {
374 struct iarchive *ia;
375 struct RequestHeader h;
376 int rc;
377
378 ia = create_buffer_iarchive(buffer, len);
379 rc = ia ? ZOK : ZSYSTEMERROR;
380 rc = rc < 0 ? rc : deserialize_RequestHeader(ia, "header", &h);
381 deallocate_RequestHeader(&h);
382 if (ia) {
383 close_buffer_iarchive(&ia);
384 }
385
386 *out_type = h.type;
387
388 return rc;
389 }
390
391 #ifndef THREADED
392 /*
393 * abort due to the use of a sync api in a singlethreaded environment
394 */
abort_singlethreaded(zhandle_t * zh)395 static void abort_singlethreaded(zhandle_t *zh)
396 {
397 LOG_ERROR(LOGCALLBACK(zh), "Sync completion used without threads");
398 abort();
399 }
400 #endif /* THREADED */
401
zookeeper_send(zsock_t * fd,const void * buf,size_t len)402 static ssize_t zookeeper_send(zsock_t *fd, const void* buf, size_t len)
403 {
404 #ifdef HAVE_OPENSSL_H
405 if (fd->ssl_sock)
406 return (ssize_t)SSL_write(fd->ssl_sock, buf, (int)len);
407 #endif
408 return send(fd->sock, buf, len, SEND_FLAGS);
409 }
410
zookeeper_recv(zsock_t * fd,void * buf,size_t len,int flags)411 static ssize_t zookeeper_recv(zsock_t *fd, void *buf, size_t len, int flags)
412 {
413 #ifdef HAVE_OPENSSL_H
414 if (fd->ssl_sock)
415 return (ssize_t)SSL_read(fd->ssl_sock, buf, (int)len);
416 #endif
417 return recv(fd->sock, buf, len, flags);
418 }
419
420 /**
421 * Get the system time.
422 *
423 * If the monotonic clock is available, we use that. The monotonic clock does
424 * not change when the wall-clock time is adjusted by NTP or the system
425 * administrator. The monotonic clock returns a value which is monotonically
426 * increasing.
427 *
428 * If POSIX monotonic clocks are not available, we fall back on the wall-clock.
429 *
430 * @param tv (out param) The time.
431 */
get_system_time(struct timeval * tv)432 void get_system_time(struct timeval *tv)
433 {
434 int ret;
435
436 #ifdef __MACH__ // OS X
437 clock_serv_t cclock;
438 mach_timespec_t mts;
439 ret = host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock);
440 if (!ret) {
441 ret += clock_get_time(cclock, &mts);
442 ret += mach_port_deallocate(mach_task_self(), cclock);
443 if (!ret) {
444 tv->tv_sec = mts.tv_sec;
445 tv->tv_usec = mts.tv_nsec / 1000;
446 }
447 }
448 if (ret) {
449 // Default to gettimeofday in case of failure.
450 ret = gettimeofday(tv, NULL);
451 }
452 #elif defined CLOCK_MONOTONIC_RAW
453 // On Linux, CLOCK_MONOTONIC is affected by ntp slew but CLOCK_MONOTONIC_RAW
454 // is not. We want the non-slewed (constant rate) CLOCK_MONOTONIC_RAW if it
455 // is available.
456 struct timespec ts = { 0 };
457 ret = clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
458 tv->tv_sec = ts.tv_sec;
459 tv->tv_usec = ts.tv_nsec / 1000;
460 #elif _POSIX_MONOTONIC_CLOCK
461 struct timespec ts = { 0 };
462 ret = clock_gettime(CLOCK_MONOTONIC, &ts);
463 tv->tv_sec = ts.tv_sec;
464 tv->tv_usec = ts.tv_nsec / 1000;
465 #elif _WIN32
466 LARGE_INTEGER counts, countsPerSecond, countsPerMicrosecond;
467 if (QueryPerformanceFrequency(&countsPerSecond) &&
468 QueryPerformanceCounter(&counts)) {
469 countsPerMicrosecond.QuadPart = countsPerSecond.QuadPart / 1000000;
470 tv->tv_sec = (long)(counts.QuadPart / countsPerSecond.QuadPart);
471 tv->tv_usec = (long)((counts.QuadPart % countsPerSecond.QuadPart) /
472 countsPerMicrosecond.QuadPart);
473 ret = 0;
474 } else {
475 ret = gettimeofday(tv, NULL);
476 }
477 #else
478 ret = gettimeofday(tv, NULL);
479 #endif
480 if (ret) {
481 abort();
482 }
483 }
484
zoo_get_context(zhandle_t * zh)485 const void *zoo_get_context(zhandle_t *zh)
486 {
487 return zh->context;
488 }
489
zoo_set_context(zhandle_t * zh,void * context)490 void zoo_set_context(zhandle_t *zh, void *context)
491 {
492 if (zh != NULL) {
493 zh->context = context;
494 }
495 }
496
zoo_recv_timeout(zhandle_t * zh)497 int zoo_recv_timeout(zhandle_t *zh)
498 {
499 return zh->recv_timeout;
500 }
501
502 /** these functions are thread unsafe, so make sure that
503 zoo_lock_auth is called before you access them **/
get_last_auth(auth_list_head_t * auth_list)504 static auth_info* get_last_auth(auth_list_head_t *auth_list) {
505 auth_info *element;
506 element = auth_list->auth;
507 if (element == NULL) {
508 return NULL;
509 }
510 while (element->next != NULL) {
511 element = element->next;
512 }
513 return element;
514 }
515
free_auth_completion(auth_completion_list_t * a_list)516 static void free_auth_completion(auth_completion_list_t *a_list) {
517 auth_completion_list_t *tmp, *ftmp;
518 if (a_list == NULL) {
519 return;
520 }
521 tmp = a_list->next;
522 while (tmp != NULL) {
523 ftmp = tmp;
524 tmp = tmp->next;
525 ftmp->completion = NULL;
526 ftmp->auth_data = NULL;
527 free(ftmp);
528 }
529 a_list->completion = NULL;
530 a_list->auth_data = NULL;
531 a_list->next = NULL;
532 return;
533 }
534
add_auth_completion(auth_completion_list_t * a_list,void_completion_t * completion,const char * data)535 static void add_auth_completion(auth_completion_list_t* a_list, void_completion_t* completion,
536 const char *data) {
537 auth_completion_list_t *element;
538 auth_completion_list_t *n_element;
539 element = a_list;
540 if (a_list->completion == NULL) {
541 //this is the first element
542 a_list->completion = *completion;
543 a_list->next = NULL;
544 a_list->auth_data = data;
545 return;
546 }
547 while (element->next != NULL) {
548 element = element->next;
549 }
550 n_element = (auth_completion_list_t*) malloc(sizeof(auth_completion_list_t));
551 n_element->next = NULL;
552 n_element->completion = *completion;
553 n_element->auth_data = data;
554 element->next = n_element;
555 return;
556 }
557
get_auth_completions(auth_list_head_t * auth_list,auth_completion_list_t * a_list)558 static void get_auth_completions(auth_list_head_t *auth_list, auth_completion_list_t *a_list) {
559 auth_info *element;
560 element = auth_list->auth;
561 if (element == NULL) {
562 return;
563 }
564 while (element) {
565 if (element->completion) {
566 add_auth_completion(a_list, &element->completion, element->data);
567 }
568 element->completion = NULL;
569 element = element->next;
570 }
571 return;
572 }
573
add_last_auth(auth_list_head_t * auth_list,auth_info * add_el)574 static void add_last_auth(auth_list_head_t *auth_list, auth_info *add_el) {
575 auth_info *element;
576 element = auth_list->auth;
577 if (element == NULL) {
578 //first element in the list
579 auth_list->auth = add_el;
580 return;
581 }
582 while (element->next != NULL) {
583 element = element->next;
584 }
585 element->next = add_el;
586 return;
587 }
588
init_auth_info(auth_list_head_t * auth_list)589 static void init_auth_info(auth_list_head_t *auth_list)
590 {
591 auth_list->auth = NULL;
592 }
593
mark_active_auth(zhandle_t * zh)594 static void mark_active_auth(zhandle_t *zh) {
595 auth_list_head_t auth_h = zh->auth_h;
596 auth_info *element;
597 if (auth_h.auth == NULL) {
598 return;
599 }
600 element = auth_h.auth;
601 while (element != NULL) {
602 element->state = 1;
603 element = element->next;
604 }
605 }
606
free_auth_info(auth_list_head_t * auth_list)607 static void free_auth_info(auth_list_head_t *auth_list)
608 {
609 auth_info *auth = auth_list->auth;
610 while (auth != NULL) {
611 auth_info* old_auth = NULL;
612 if(auth->scheme!=NULL)
613 free(auth->scheme);
614 deallocate_Buffer(&auth->auth);
615 old_auth = auth;
616 auth = auth->next;
617 free(old_auth);
618 }
619 init_auth_info(auth_list);
620 }
621
is_unrecoverable(zhandle_t * zh)622 int is_unrecoverable(zhandle_t *zh)
623 {
624 return (zh->state<0)? ZINVALIDSTATE: ZOK;
625 }
626
exists_result_checker(zhandle_t * zh,int rc)627 zk_hashtable *exists_result_checker(zhandle_t *zh, int rc)
628 {
629 if (rc == ZOK) {
630 return zh->active_node_watchers;
631 } else if (rc == ZNONODE) {
632 return zh->active_exist_watchers;
633 }
634 return 0;
635 }
636
data_result_checker(zhandle_t * zh,int rc)637 zk_hashtable *data_result_checker(zhandle_t *zh, int rc)
638 {
639 return rc==ZOK ? zh->active_node_watchers : 0;
640 }
641
child_result_checker(zhandle_t * zh,int rc)642 zk_hashtable *child_result_checker(zhandle_t *zh, int rc)
643 {
644 return rc==ZOK ? zh->active_child_watchers : 0;
645 }
646
close_zsock(zsock_t * fd)647 void close_zsock(zsock_t *fd)
648 {
649 if (fd->sock != -1) {
650 #ifdef HAVE_OPENSSL_H
651 if (fd->ssl_sock) {
652 SSL_free(fd->ssl_sock);
653 fd->ssl_sock = NULL;
654 SSL_CTX_free(fd->ssl_ctx);
655 fd->ssl_ctx = NULL;
656 }
657 #endif
658 close(fd->sock);
659 fd->sock = -1;
660 }
661 }
662
663 /**
664 * Frees and closes everything associated with a handle,
665 * including the handle itself.
666 */
destroy(zhandle_t * zh)667 static void destroy(zhandle_t *zh)
668 {
669 if (zh == NULL) {
670 return;
671 }
672 /* call any outstanding completions with a special error code */
673 cleanup_bufs(zh,1,ZCLOSING);
674 if (process_async(zh->outstanding_sync)) {
675 process_completions(zh);
676 }
677 if (zh->hostname != 0) {
678 free(zh->hostname);
679 zh->hostname = NULL;
680 }
681 if (zh->fd->sock != -1) {
682 close_zsock(zh->fd);
683 memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
684 zh->state = 0;
685 }
686 addrvec_free(&zh->addrs);
687
688 if (zh->chroot != NULL) {
689 free(zh->chroot);
690 zh->chroot = NULL;
691 }
692 #ifdef HAVE_OPENSSL_H
693 if (zh->fd->cert) {
694 free(zh->fd->cert->certstr);
695 free(zh->fd->cert);
696 zh->fd->cert = NULL;
697 }
698 #endif
699 free_auth_info(&zh->auth_h);
700 destroy_zk_hashtable(zh->active_node_watchers);
701 destroy_zk_hashtable(zh->active_exist_watchers);
702 destroy_zk_hashtable(zh->active_child_watchers);
703 addrvec_free(&zh->addrs_old);
704 addrvec_free(&zh->addrs_new);
705
706 #ifdef HAVE_CYRUS_SASL_H
707 if (zh->sasl_client) {
708 zoo_sasl_client_destroy(zh->sasl_client);
709 free(zh->sasl_client);
710 zh->sasl_client = NULL;
711 }
712 #endif /* HAVE_CYRUS_SASL_H */
713 }
714
setup_random()715 static void setup_random()
716 {
717 #ifndef _WIN32 // TODO: better seed
718 int seed;
719 int fd = open("/dev/urandom", O_RDONLY);
720 if (fd == -1) {
721 seed = getpid();
722 } else {
723 int seed_len = 0;
724
725 /* Enter a loop to fill in seed with random data from /dev/urandom.
726 * This is done in a loop so that we can safely handle short reads
727 * which can happen due to signal interruptions.
728 */
729 while (seed_len < sizeof(seed)) {
730 /* Assert we either read something or we were interrupted due to a
731 * signal (errno == EINTR) in which case we need to retry.
732 */
733 int rc = read(fd, &seed + seed_len, sizeof(seed) - seed_len);
734 assert(rc > 0 || errno == EINTR);
735 if (rc > 0) {
736 seed_len += rc;
737 }
738 }
739 close(fd);
740 }
741 srandom(seed);
742 srand48(seed);
743 #endif
744 }
745
746 #ifndef __CYGWIN__
747 /**
748 * get the errno from the return code
749 * of get addrinfo. Errno is not set
750 * with the call to getaddrinfo, so thats
751 * why we have to do this.
752 */
getaddrinfo_errno(int rc)753 static int getaddrinfo_errno(int rc) {
754 switch(rc) {
755 case EAI_NONAME:
756 // ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
757 #if defined EAI_NODATA && EAI_NODATA != EAI_NONAME
758 case EAI_NODATA:
759 #endif
760 return ENOENT;
761 case EAI_MEMORY:
762 return ENOMEM;
763 default:
764 return EINVAL;
765 }
766 }
767 #endif
768
769 /**
770 * Count the number of hosts in the connection host string. This assumes it's
771 * a well-formed connection string whereby each host is separated by a comma.
772 */
count_hosts(char * hosts)773 static int count_hosts(char *hosts)
774 {
775 uint32_t count = 0;
776 char *loc = hosts;
777 if (!hosts || strlen(hosts) == 0) {
778 return 0;
779 }
780
781 while ((loc = strchr(loc, ','))) {
782 count++;
783 loc+=1;
784 }
785
786 return count+1;
787 }
788
789 /**
790 * Resolve hosts and populate provided address vector with shuffled results.
791 * The contents of the provided address vector will be initialized to an
792 * empty state.
793 */
resolve_hosts(const zhandle_t * zh,const char * hosts_in,addrvec_t * avec)794 static int resolve_hosts(const zhandle_t *zh, const char *hosts_in, addrvec_t *avec)
795 {
796 int rc = ZOK;
797 char *host = NULL;
798 char *hosts = NULL;
799 int num_hosts = 0;
800 char *strtok_last = NULL;
801
802 if (zh == NULL || hosts_in == NULL || avec == NULL) {
803 return ZBADARGUMENTS;
804 }
805
806 // initialize address vector
807 addrvec_init(avec);
808
809 hosts = strdup(hosts_in);
810 if (hosts == NULL) {
811 LOG_ERROR(LOGCALLBACK(zh), "out of memory");
812 errno=ENOMEM;
813 rc=ZSYSTEMERROR;
814 goto fail;
815 }
816
817 num_hosts = count_hosts(hosts);
818 if (num_hosts == 0) {
819 free(hosts);
820 return ZOK;
821 }
822
823 // Allocate list inside avec
824 rc = addrvec_alloc_capacity(avec, num_hosts);
825 if (rc != 0) {
826 LOG_ERROR(LOGCALLBACK(zh), "out of memory");
827 errno=ENOMEM;
828 rc=ZSYSTEMERROR;
829 goto fail;
830 }
831
832 host = strtok_r(hosts, ",", &strtok_last);
833 while(host) {
834 char *port_spec = strrchr(host, ':');
835 char *end_port_spec;
836 int port;
837 if (!port_spec) {
838 LOG_ERROR(LOGCALLBACK(zh), "no port in %s", host);
839 errno=EINVAL;
840 rc=ZBADARGUMENTS;
841 goto fail;
842 }
843 *port_spec = '\0';
844 port_spec++;
845 port = strtol(port_spec, &end_port_spec, 0);
846 if (!*port_spec || *end_port_spec || port == 0) {
847 LOG_ERROR(LOGCALLBACK(zh), "invalid port in %s", host);
848 errno=EINVAL;
849 rc=ZBADARGUMENTS;
850 goto fail;
851 }
852 #if defined(__CYGWIN__)
853 // sadly CYGWIN doesn't have getaddrinfo
854 // but happily gethostbyname is threadsafe in windows
855 {
856 struct hostent *he;
857 char **ptr;
858 struct sockaddr_in *addr4;
859
860 he = gethostbyname(host);
861 if (!he) {
862 LOG_ERROR(LOGCALLBACK(zh), "could not resolve %s", host);
863 errno=ENOENT;
864 rc=ZBADARGUMENTS;
865 goto fail;
866 }
867
868 // Setup the address array
869 for(ptr = he->h_addr_list;*ptr != 0; ptr++) {
870 if (addrs->count == addrs->capacity) {
871 rc = addrvec_grow_default(addrs);
872 if (rc != 0) {
873 LOG_ERROR(LOGCALLBACK(zh), "out of memory");
874 errno=ENOMEM;
875 rc=ZSYSTEMERROR;
876 goto fail;
877 }
878 }
879 addr = &addrs->list[addrs->count];
880 addr4 = (struct sockaddr_in*)addr;
881 addr->ss_family = he->h_addrtype;
882 if (addr->ss_family == AF_INET) {
883 addr4->sin_port = htons(port);
884 memset(&addr4->sin_zero, 0, sizeof(addr4->sin_zero));
885 memcpy(&addr4->sin_addr, *ptr, he->h_length);
886 zh->addrs.count++;
887 }
888 #if defined(AF_INET6)
889 else if (addr->ss_family == AF_INET6) {
890 struct sockaddr_in6 *addr6;
891
892 addr6 = (struct sockaddr_in6*)addr;
893 addr6->sin6_port = htons(port);
894 addr6->sin6_scope_id = 0;
895 addr6->sin6_flowinfo = 0;
896 memcpy(&addr6->sin6_addr, *ptr, he->h_length);
897 zh->addrs.count++;
898 }
899 #endif
900 else {
901 LOG_WARN(LOGCALLBACK(zh), "skipping unknown address family %x for %s",
902 addr->ss_family, hosts_in);
903 }
904 }
905 host = strtok_r(0, ",", &strtok_last);
906 }
907 #else
908 {
909 struct addrinfo hints, *res, *res0;
910
911 memset(&hints, 0, sizeof(hints));
912 #ifdef AI_ADDRCONFIG
913 hints.ai_flags = AI_ADDRCONFIG;
914 #else
915 hints.ai_flags = 0;
916 #endif
917 hints.ai_family = AF_UNSPEC;
918 hints.ai_socktype = SOCK_STREAM;
919 hints.ai_protocol = IPPROTO_TCP;
920
921 while(isspace(*host) && host != strtok_last)
922 host++;
923
924 if ((rc = getaddrinfo(host, port_spec, &hints, &res0)) != 0) {
925 //bug in getaddrinfo implementation when it returns
926 //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and
927 // ai_flags as AI_ADDRCONFIG
928 #ifdef AI_ADDRCONFIG
929 if ((hints.ai_flags == AI_ADDRCONFIG) &&
930 // ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
931 #ifdef EAI_ADDRFAMILY
932 ((rc ==EAI_BADFLAGS) || (rc == EAI_ADDRFAMILY))) {
933 #else
934 (rc == EAI_BADFLAGS)) {
935 #endif
936 //reset ai_flags to null
937 hints.ai_flags = 0;
938 //retry getaddrinfo
939 rc = getaddrinfo(host, port_spec, &hints, &res0);
940 }
941 #endif
942 if (rc != 0) {
943 errno = getaddrinfo_errno(rc);
944 #ifdef _WIN32
945 LOG_ERROR(LOGCALLBACK(zh), "Win32 message: %s\n", gai_strerror(rc));
946 #elif __linux__ && __GNUC__
947 LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", gai_strerror(rc));
948 #else
949 LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", strerror(errno));
950 #endif
951 rc=ZSYSTEMERROR;
952 goto next;
953 }
954 }
955
956 for (res = res0; res; res = res->ai_next) {
957 // Expand address list if needed
958 if (avec->count == avec->capacity) {
959 rc = addrvec_grow_default(avec);
960 if (rc != 0) {
961 LOG_ERROR(LOGCALLBACK(zh), "out of memory");
962 errno=ENOMEM;
963 rc=ZSYSTEMERROR;
964 goto fail;
965 }
966 }
967
968 // Copy addrinfo into address list
969 switch (res->ai_family) {
970 case AF_INET:
971 #if defined(AF_INET6)
972 case AF_INET6:
973 #endif
974 addrvec_append_addrinfo(avec, res);
975 break;
976 default:
977 LOG_WARN(LOGCALLBACK(zh), "skipping unknown address family %x for %s",
978 res->ai_family, hosts_in);
979 break;
980 }
981 }
982
983 freeaddrinfo(res0);
984 next:
985 host = strtok_r(0, ",", &strtok_last);
986 }
987 #endif
988 }
989 if (avec->count == 0) {
990 rc = ZSYSTEMERROR; // not a single host resolved
991 goto fail;
992 }
993
994 free(hosts);
995
996 if(!disable_conn_permute){
997 setup_random();
998 addrvec_shuffle(avec);
999 }
1000
1001 return ZOK;
1002
1003 fail:
1004 addrvec_free(avec);
1005
1006 if (hosts) {
1007 free(hosts);
1008 hosts = NULL;
1009 }
1010
1011 return rc;
1012 }
1013
1014 /**
1015 * Updates the list of servers and determine if changing connections is necessary.
1016 * Permutes server list for proper load balancing.
1017 *
1018 * Changing connections is necessary if one of the following holds:
1019 * a) the server this client is currently connected is not in new address list.
1020 * Otherwise (if currentHost is in the new list):
1021 * b) the number of servers in the cluster is increasing - in this case the load
1022 * on currentHost should decrease, which means that SOME of the clients
1023 * connected to it will migrate to the new servers. The decision whether this
1024 * client migrates or not is probabilistic so that the expected number of
1025 * clients connected to each server is the same.
1026 *
1027 * If reconfig is set to true, the function sets pOld and pNew that correspond
1028 * to the probability to migrate to ones of the new servers or one of the old
1029 * servers (migrating to one of the old servers is done only if our client's
1030 * currentHost is not in new list).
1031 *
1032 * See zoo_cycle_next_server for the selection logic.
1033 *
1034 * \param ref_time an optional "reference time," used to determine if
1035 * resolution can be skipped in accordance to the delay set by \ref
1036 * zoo_set_servers_resolution_delay. Passing NULL prevents skipping.
1037 *
1038 * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
1039 * protocol and its evaluation,
1040 */
1041 int update_addrs(zhandle_t *zh, const struct timeval *ref_time)
1042 {
1043 int rc = ZOK;
1044 char *hosts = NULL;
1045 uint32_t num_old = 0;
1046 uint32_t num_new = 0;
1047 uint32_t i = 0;
1048 int found_current = 0;
1049 addrvec_t resolved = { 0 };
1050
1051 // Verify we have a valid handle
1052 if (zh == NULL) {
1053 return ZBADARGUMENTS;
1054 }
1055
1056 // zh->hostname should always be set
1057 if (zh->hostname == NULL)
1058 {
1059 return ZSYSTEMERROR;
1060 }
1061
1062 // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1063 lock_reconfig(zh);
1064
1065 // Check if we are due for a host name resolution. (See
1066 // zoo_set_servers_resolution_delay. The answer is always "yes"
1067 // if no reference is provided or the file descriptor is invalid.)
1068 if (ref_time && zh->fd->sock != -1) {
1069 int do_resolve;
1070
1071 if (zh->resolve_delay_ms <= 0) {
1072 // -1 disables, 0 means unconditional. Fail safe.
1073 do_resolve = zh->resolve_delay_ms != -1;
1074 } else {
1075 int elapsed_ms = calculate_interval(&zh->last_resolve, ref_time);
1076 // Include < 0 in case of overflow, or if we are not
1077 // backed by a monotonic clock.
1078 do_resolve = elapsed_ms > zh->resolve_delay_ms || elapsed_ms < 0;
1079 }
1080
1081 if (!do_resolve) {
1082 goto finish;
1083 }
1084 }
1085
1086 // Copy zh->hostname for local use
1087 hosts = strdup(zh->hostname);
1088 if (hosts == NULL) {
1089 rc = ZSYSTEMERROR;
1090 goto finish;
1091 }
1092
1093 rc = resolve_hosts(zh, hosts, &resolved);
1094 if (rc != ZOK)
1095 {
1096 goto finish;
1097 }
1098
1099 // Unconditionally note last resolution time.
1100 if (ref_time) {
1101 zh->last_resolve = *ref_time;
1102 } else {
1103 get_system_time(&zh->last_resolve);
1104 }
1105
1106 // If the addrvec list is identical to last time we ran don't do anything
1107 if (addrvec_eq(&zh->addrs, &resolved))
1108 {
1109 goto finish;
1110 }
1111
1112 // Is the server we're connected to in the new resolved list?
1113 found_current = addrvec_contains(&resolved, &zh->addr_cur);
1114
1115 // Clear out old and new address lists
1116 zh->reconfig = 1;
1117 addrvec_free(&zh->addrs_old);
1118 addrvec_free(&zh->addrs_new);
1119
1120 // Divide server list into addrs_old if in previous list and addrs_new if not
1121 for (i = 0; i < resolved.count; i++)
1122 {
1123 struct sockaddr_storage *resolved_address = &resolved.data[i];
1124 if (addrvec_contains(&zh->addrs, resolved_address))
1125 {
1126 rc = addrvec_append(&zh->addrs_old, resolved_address);
1127 if (rc != ZOK)
1128 {
1129 goto finish;
1130 }
1131 }
1132 else {
1133 rc = addrvec_append(&zh->addrs_new, resolved_address);
1134 if (rc != ZOK)
1135 {
1136 goto finish;
1137 }
1138 }
1139 }
1140
1141 num_old = zh->addrs_old.count;
1142 num_new = zh->addrs_new.count;
1143
1144 // Number of servers increased
1145 if (num_old + num_new > zh->addrs.count)
1146 {
1147 if (found_current) {
1148 // my server is in the new config, but load should be decreased.
1149 // Need to decide if the client is moving to one of the new servers
1150 if (drand48() <= (1 - ((double)zh->addrs.count) / (num_old + num_new))) {
1151 zh->pNew = 1;
1152 zh->pOld = 0;
1153 } else {
1154 // do nothing special -- stay with the current server
1155 zh->reconfig = 0;
1156 }
1157 } else {
1158 // my server is not in the new config, and load on old servers must
1159 // be decreased, so connect to one of the new servers
1160 zh->pNew = 1;
1161 zh->pOld = 0;
1162 }
1163 }
1164
1165 // Number of servers stayed the same or decreased
1166 else {
1167 if (found_current) {
1168 // my server is in the new config, and load should be increased, so
1169 // stay with this server and do nothing special
1170 zh->reconfig = 0;
1171 } else {
1172 zh->pOld = ((double) (num_old * (zh->addrs.count - (num_old + num_new)))) / ((num_old + num_new) * (zh->addrs.count - num_old));
1173 zh->pNew = 1 - zh->pOld;
1174 }
1175 }
1176
1177 addrvec_free(&zh->addrs);
1178 zh->addrs = resolved;
1179
1180 // If we need to do a reconfig and we're currently connected to a server,
1181 // then force close that connection so on next interest() call we'll make a
1182 // new connection
1183 if (zh->reconfig == 1 && zh->fd->sock != -1)
1184 {
1185 close_zsock(zh->fd);
1186 zh->state = ZOO_NOTCONNECTED_STATE;
1187 }
1188
1189 finish:
1190
1191 unlock_reconfig(zh);
1192
1193 // If we short-circuited out and never assigned resolved to zh->addrs then we
1194 // need to free resolved to avoid a memleak.
1195 if (resolved.data && zh->addrs.data != resolved.data)
1196 {
1197 addrvec_free(&resolved);
1198 }
1199
1200 if (hosts) {
1201 free(hosts);
1202 hosts = NULL;
1203 }
1204
1205 return rc;
1206 }
1207
1208 const clientid_t *zoo_client_id(zhandle_t *zh)
1209 {
1210 return &zh->client_id;
1211 }
1212
1213 static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4,void*p5){}
1214
1215 watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn)
1216 {
1217 watcher_fn oldWatcher=zh->watcher;
1218 if (newFn) {
1219 zh->watcher = newFn;
1220 } else {
1221 zh->watcher = null_watcher_fn;
1222 }
1223 return oldWatcher;
1224 }
1225
1226 struct sockaddr* zookeeper_get_connected_host(zhandle_t *zh,
1227 struct sockaddr *addr, socklen_t *addr_len)
1228 {
1229 if (zh->state!=ZOO_CONNECTED_STATE) {
1230 return NULL;
1231 }
1232 if (getpeername(zh->fd->sock, addr, addr_len)==-1) {
1233 return NULL;
1234 }
1235 return addr;
1236 }
1237
1238 static void log_env(zhandle_t *zh) {
1239 char buf[2048];
1240 #ifdef HAVE_SYS_UTSNAME_H
1241 struct utsname utsname;
1242 #endif
1243
1244 #if defined(HAVE_GETUID) && defined(HAVE_GETPWUID_R)
1245 struct passwd pw;
1246 struct passwd *pwp = NULL;
1247 uid_t uid = 0;
1248 #endif
1249
1250 LOG_INFO(LOGCALLBACK(zh), "Client environment:zookeeper.version=%s", PACKAGE_STRING);
1251
1252 #ifdef HAVE_GETHOSTNAME
1253 gethostname(buf, sizeof(buf));
1254 LOG_INFO(LOGCALLBACK(zh), "Client environment:host.name=%s", buf);
1255 #else
1256 LOG_INFO(LOGCALLBACK(zh), "Client environment:host.name=<not implemented>");
1257 #endif
1258
1259 #ifdef HAVE_SYS_UTSNAME_H
1260 uname(&utsname);
1261 LOG_INFO(LOGCALLBACK(zh), "Client environment:os.name=%s", utsname.sysname);
1262 LOG_INFO(LOGCALLBACK(zh), "Client environment:os.arch=%s", utsname.release);
1263 LOG_INFO(LOGCALLBACK(zh), "Client environment:os.version=%s", utsname.version);
1264 #else
1265 LOG_INFO(LOGCALLBACK(zh), "Client environment:os.name=<not implemented>");
1266 LOG_INFO(LOGCALLBACK(zh), "Client environment:os.arch=<not implemented>");
1267 LOG_INFO(LOGCALLBACK(zh), "Client environment:os.version=<not implemented>");
1268 #endif
1269
1270 #ifdef HAVE_GETLOGIN
1271 LOG_INFO(LOGCALLBACK(zh), "Client environment:user.name=%s", getlogin());
1272 #else
1273 LOG_INFO(LOGCALLBACK(zh), "Client environment:user.name=<not implemented>");
1274 #endif
1275
1276 #if defined(HAVE_GETUID) && defined(HAVE_GETPWUID_R)
1277 uid = getuid();
1278 if (!getpwuid_r(uid, &pw, buf, sizeof(buf), &pwp) && pwp) {
1279 LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=%s", pw.pw_dir);
1280 } else {
1281 LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=<NA>");
1282 }
1283 #else
1284 LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=<not implemented>");
1285 #endif
1286
1287 #ifdef HAVE_GETCWD
1288 if (!getcwd(buf, sizeof(buf))) {
1289 LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=<toolong>");
1290 } else {
1291 LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=%s", buf);
1292 }
1293 #else
1294 LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=<not implemented>");
1295 #endif
1296 }
1297
1298 /**
1299 * Create a zookeeper handle associated with the given host and port.
1300 */
1301 static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
1302 int recv_timeout, const clientid_t *clientid, void *context, int flags,
1303 log_callback_fn log_callback, zcert_t *cert, void *sasl_params)
1304 {
1305 int errnosave = 0;
1306 zhandle_t *zh = NULL;
1307 char *index_chroot = NULL;
1308
1309 // Create our handle
1310 zh = calloc(1, sizeof(*zh));
1311 if (!zh) {
1312 return 0;
1313 }
1314
1315 // Set log callback before calling into log_env
1316 zh->log_callback = log_callback;
1317
1318 if (!(flags & ZOO_NO_LOG_CLIENTENV)) {
1319 log_env(zh);
1320 }
1321
1322 zh->fd = calloc(1, sizeof(zsock_t));
1323 zh->fd->sock = -1;
1324 if (cert) {
1325 zh->fd->cert = calloc(1, sizeof(zcert_t));
1326 memcpy(zh->fd->cert, cert, sizeof(zcert_t));
1327 }
1328
1329 #ifdef _WIN32
1330 if (Win32WSAStartup()){
1331 LOG_ERROR(LOGCALLBACK(zh), "Error initializing ws2_32.dll");
1332 return 0;
1333 }
1334 #endif
1335 LOG_INFO(LOGCALLBACK(zh), "Initiating client connection, host=%s sessionTimeout=%d watcher=%p"
1336 " sessionId=%#llx sessionPasswd=%s context=%p flags=%d",
1337 host,
1338 recv_timeout,
1339 watcher,
1340 (clientid == 0 ? 0 : clientid->client_id),
1341 ((clientid == 0) || (clientid->passwd[0] == 0) ?
1342 "<null>" : "<hidden>"),
1343 context,
1344 flags);
1345
1346 zh->hostname = NULL;
1347 zh->state = ZOO_NOTCONNECTED_STATE;
1348 zh->context = context;
1349 zh->recv_timeout = recv_timeout;
1350 zh->allow_read_only = flags & ZOO_READONLY;
1351 // non-zero clientid implies we've seen r/w server already
1352 zh->seen_rw_server_before = (clientid != 0 && clientid->client_id != 0);
1353 init_auth_info(&zh->auth_h);
1354 if (watcher) {
1355 zh->watcher = watcher;
1356 } else {
1357 zh->watcher = null_watcher_fn;
1358 }
1359 if (host == 0 || *host == 0) { // what we shouldn't dup
1360 errno=EINVAL;
1361 goto abort;
1362 }
1363 //parse the host to get the chroot if available
1364 index_chroot = strchr(host, '/');
1365 if (index_chroot) {
1366 zh->chroot = strdup(index_chroot);
1367 if (zh->chroot == NULL) {
1368 goto abort;
1369 }
1370 // if chroot is just / set it to null
1371 if (strlen(zh->chroot) == 1) {
1372 free(zh->chroot);
1373 zh->chroot = NULL;
1374 }
1375 // cannot use strndup so allocate and strcpy
1376 zh->hostname = (char *) malloc(index_chroot - host + 1);
1377 zh->hostname = strncpy(zh->hostname, host, (index_chroot - host));
1378 //strncpy does not null terminate
1379 *(zh->hostname + (index_chroot - host)) = '\0';
1380
1381 } else {
1382 zh->chroot = NULL;
1383 zh->hostname = strdup(host);
1384 }
1385 if (zh->chroot && !isValidPath(zh->chroot, 0)) {
1386 errno = EINVAL;
1387 goto abort;
1388 }
1389 if (zh->hostname == 0) {
1390 goto abort;
1391 }
1392 if(update_addrs(zh, NULL) != 0) {
1393 goto abort;
1394 }
1395
1396 if (clientid) {
1397 memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
1398 } else {
1399 memset(&zh->client_id, 0, sizeof(zh->client_id));
1400 }
1401 zh->io_count = 0;
1402 zh->primer_buffer.buffer = zh->primer_storage_buffer;
1403 zh->primer_buffer.curr_offset = 0;
1404 zh->primer_buffer.len = sizeof(zh->primer_storage_buffer);
1405 zh->primer_buffer.next = 0;
1406 zh->last_zxid = 0;
1407 zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0;
1408 zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
1409 zh->active_node_watchers=create_zk_hashtable();
1410 zh->active_exist_watchers=create_zk_hashtable();
1411 zh->active_child_watchers=create_zk_hashtable();
1412 zh->disable_reconnection_attempt = 0;
1413
1414 #ifdef HAVE_CYRUS_SASL_H
1415 if (sasl_params) {
1416 zh->sasl_client = zoo_sasl_client_create(
1417 (zoo_sasl_params_t*)sasl_params);
1418 if (!zh->sasl_client) {
1419 goto abort;
1420 }
1421 }
1422 #endif /* HAVE_CYRUS_SASL_H */
1423
1424 if (adaptor_init(zh) == -1) {
1425 goto abort;
1426 }
1427
1428 return zh;
1429 abort:
1430 errnosave=errno;
1431 destroy(zh);
1432 free(zh->fd);
1433 free(zh);
1434 errno=errnosave;
1435 return 0;
1436 }
1437
1438 zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
1439 int recv_timeout, const clientid_t *clientid, void *context, int flags)
1440 {
1441 return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, NULL, NULL);
1442 }
1443
1444 zhandle_t *zookeeper_init2(const char *host, watcher_fn watcher,
1445 int recv_timeout, const clientid_t *clientid, void *context, int flags,
1446 log_callback_fn log_callback)
1447 {
1448 return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL, NULL);
1449 }
1450
1451 #ifdef HAVE_OPENSSL_H
1452 zhandle_t *zookeeper_init_ssl(const char *host, const char *cert, watcher_fn watcher,
1453 int recv_timeout, const clientid_t *clientid, void *context, int flags)
1454 {
1455 zcert_t zcert;
1456 zcert.certstr = strdup(cert);
1457 zcert.ca = strtok(strdup(cert), ",");
1458 zcert.cert = strtok(NULL, ",");
1459 zcert.key = strtok(NULL, ",");
1460 zcert.passwd = strtok(NULL, ",");
1461 return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, &zcert, NULL);
1462 }
1463 #endif
1464
1465 #ifdef HAVE_CYRUS_SASL_H
1466 zhandle_t *zookeeper_init_sasl(const char *host, watcher_fn watcher,
1467 int recv_timeout, const clientid_t *clientid, void *context, int flags,
1468 log_callback_fn log_callback, zoo_sasl_params_t *sasl_params)
1469 {
1470 return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL, sasl_params);
1471 }
1472 #endif /* HAVE_CYRUS_SASL_H */
1473
1474 /**
1475 * Set a new list of zk servers to connect to. Disconnect will occur if
1476 * current connection endpoint is not in the list.
1477 */
1478 int zoo_set_servers(zhandle_t *zh, const char *hosts)
1479 {
1480 if (hosts == NULL)
1481 {
1482 LOG_ERROR(LOGCALLBACK(zh), "New server list cannot be empty");
1483 return ZBADARGUMENTS;
1484 }
1485
1486 // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1487 lock_reconfig(zh);
1488
1489 // Reset hostname to new set of hosts to connect to
1490 if (zh->hostname) {
1491 free(zh->hostname);
1492 }
1493
1494 zh->hostname = strdup(hosts);
1495
1496 unlock_reconfig(zh);
1497
1498 return update_addrs(zh, NULL);
1499 }
1500
1501 /*
1502 * Sets a minimum delay to observe between "routine" host name
1503 * resolutions. See prototype for full documentation.
1504 */
1505 int zoo_set_servers_resolution_delay(zhandle_t *zh, int delay_ms) {
1506 if (delay_ms < -1) {
1507 LOG_ERROR(LOGCALLBACK(zh), "Resolution delay cannot be %d", delay_ms);
1508 return ZBADARGUMENTS;
1509 }
1510
1511 // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1512 lock_reconfig(zh);
1513
1514 zh->resolve_delay_ms = delay_ms;
1515
1516 unlock_reconfig(zh);
1517
1518 return ZOK;
1519 }
1520
1521 /**
1522 * Get the next server to connect to, when in 'reconfig' mode, which means that
1523 * we've updated the server list to connect to, and are now trying to find some
1524 * server to connect to. Once we get successfully connected, 'reconfig' mode is
1525 * set to false. Similarly, if we tried to connect to all servers in new config
1526 * and failed, 'reconfig' mode is set to false.
1527 *
1528 * While in 'reconfig' mode, we should connect to a server in the new set of
1529 * servers (addrs_new) with probability pNew and to servers in the old set of
1530 * servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
1531 * out all servers in either, we continue to try servers from the other set,
1532 * regardless of pNew or pOld. If we tried all servers we give up and go back to
1533 * the normal round robin mode
1534 *
1535 * When called, must be protected by lock_reconfig(zh).
1536 */
1537 static int get_next_server_in_reconfig(zhandle_t *zh)
1538 {
1539 int take_new = drand48() <= zh->pNew;
1540
1541 LOG_DEBUG(LOGCALLBACK(zh), "[OLD] count=%d capacity=%d next=%d hasnext=%d",
1542 zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
1543 addrvec_hasnext(&zh->addrs_old));
1544 LOG_DEBUG(LOGCALLBACK(zh), "[NEW] count=%d capacity=%d next=%d hasnext=%d",
1545 zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
1546 addrvec_hasnext(&zh->addrs_new));
1547
1548 // Take one of the new servers if we haven't tried them all yet
1549 // and either the probability tells us to connect to one of the new servers
1550 // or if we already tried them all then use one of the old servers
1551 if (addrvec_hasnext(&zh->addrs_new)
1552 && (take_new || !addrvec_hasnext(&zh->addrs_old)))
1553 {
1554 addrvec_next(&zh->addrs_new, &zh->addr_cur);
1555 LOG_DEBUG(LOGCALLBACK(zh), "Using next from NEW=%s", format_endpoint_info(&zh->addr_cur));
1556 return 0;
1557 }
1558
1559 // start taking old servers
1560 if (addrvec_hasnext(&zh->addrs_old)) {
1561 addrvec_next(&zh->addrs_old, &zh->addr_cur);
1562 LOG_DEBUG(LOGCALLBACK(zh), "Using next from OLD=%s", format_endpoint_info(&zh->addr_cur));
1563 return 0;
1564 }
1565
1566 LOG_DEBUG(LOGCALLBACK(zh), "Failed to find either new or old");
1567 memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
1568 return 1;
1569 }
1570
1571 /**
1572 * Cycle through our server list to the correct 'next' server. The 'next' server
1573 * to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
1574 * mode means we've upated the server list and are now trying to find a server
1575 * to connect to. Once we get connected, we are no longer in the reconfig mode.
1576 * Similarly, if we try to connect to all the servers in the new configuration
1577 * and failed, reconfig mode is set to false.
1578 *
1579 * For more algorithm details, see get_next_server_in_reconfig.
1580 */
1581 void zoo_cycle_next_server(zhandle_t *zh)
1582 {
1583 // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1584 lock_reconfig(zh);
1585
1586 memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
1587
1588 if (zh->reconfig)
1589 {
1590 if (get_next_server_in_reconfig(zh) == 0) {
1591 unlock_reconfig(zh);
1592 return;
1593 }
1594
1595 // tried all new and old servers and couldn't connect
1596 zh->reconfig = 0;
1597 }
1598
1599 addrvec_next(&zh->addrs, &zh->addr_cur);
1600
1601 unlock_reconfig(zh);
1602
1603 return;
1604 }
1605
1606 /**
1607 * Get the host:port for the server we are currently connecting to or connected
1608 * to. This is largely for testing purposes but is also generally useful for
1609 * other client software built on top of this client.
1610 */
1611 const char* zoo_get_current_server(zhandle_t* zh)
1612 {
1613 const char *endpoint_info = NULL;
1614
1615 // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1616 // Need the lock here as it is changed in update_addrs()
1617 lock_reconfig(zh);
1618
1619 endpoint_info = format_endpoint_info(&zh->addr_cur);
1620 unlock_reconfig(zh);
1621 return endpoint_info;
1622 }
1623
1624 /**
1625 * deallocated the free_path only its beeen allocated
1626 * and not equal to path
1627 */
1628 void free_duplicate_path(const char *free_path, const char* path) {
1629 if (free_path != path) {
1630 free((void*)free_path);
1631 }
1632 }
1633
1634 /**
1635 prepend the chroot path if available else return the path
1636 */
1637 static char* prepend_string(zhandle_t *zh, const char* client_path) {
1638 char *ret_str;
1639 if (zh == NULL || zh->chroot == NULL)
1640 return (char *) client_path;
1641 // handle the chroot itself, client_path = "/"
1642 if (strlen(client_path) == 1) {
1643 return strdup(zh->chroot);
1644 }
1645 ret_str = (char *) malloc(strlen(zh->chroot) + strlen(client_path) + 1);
1646 strcpy(ret_str, zh->chroot);
1647 return strcat(ret_str, client_path);
1648 }
1649
1650 /**
1651 strip off the chroot string from the server path
1652 if there is one else return the exact path
1653 */
1654 char* sub_string(zhandle_t *zh, const char* server_path) {
1655 char *ret_str;
1656 if (zh->chroot == NULL)
1657 return (char *) server_path;
1658 //ZOOKEEPER-1027
1659 if (strncmp(server_path, zh->chroot, strlen(zh->chroot)) != 0) {
1660 LOG_ERROR(LOGCALLBACK(zh), "server path %s does not include chroot path %s",
1661 server_path, zh->chroot);
1662 return (char *) server_path;
1663 }
1664 if (strlen(server_path) == strlen(zh->chroot)) {
1665 //return "/"
1666 ret_str = strdup("/");
1667 return ret_str;
1668 }
1669 ret_str = strdup(server_path + strlen(zh->chroot));
1670 return ret_str;
1671 }
1672
1673 static buffer_list_t *allocate_buffer(char *buff, int len)
1674 {
1675 buffer_list_t *buffer = calloc(1, sizeof(*buffer));
1676 if (buffer == 0)
1677 return 0;
1678
1679 buffer->len = len==0?sizeof(*buffer):len;
1680 buffer->curr_offset = 0;
1681 buffer->buffer = buff;
1682 buffer->next = 0;
1683 return buffer;
1684 }
1685
1686 static void free_buffer(buffer_list_t *b)
1687 {
1688 if (!b) {
1689 return;
1690 }
1691 if (b->buffer) {
1692 free(b->buffer);
1693 }
1694 free(b);
1695 }
1696
1697 static buffer_list_t *dequeue_buffer(buffer_head_t *list)
1698 {
1699 buffer_list_t *b;
1700 lock_buffer_list(list);
1701 b = list->head;
1702 if (b) {
1703 list->head = b->next;
1704 if (!list->head) {
1705 assert(b == list->last);
1706 list->last = 0;
1707 }
1708 }
1709 unlock_buffer_list(list);
1710 return b;
1711 }
1712
1713 static int remove_buffer(buffer_head_t *list)
1714 {
1715 buffer_list_t *b = dequeue_buffer(list);
1716 if (!b) {
1717 return 0;
1718 }
1719 free_buffer(b);
1720 return 1;
1721 }
1722
1723 static void queue_buffer(buffer_head_t *list, buffer_list_t *b, int add_to_front)
1724 {
1725 b->next = 0;
1726 lock_buffer_list(list);
1727 if (list->head) {
1728 assert(list->last);
1729 // The list is not empty
1730 if (add_to_front) {
1731 b->next = list->head;
1732 list->head = b;
1733 } else {
1734 list->last->next = b;
1735 list->last = b;
1736 }
1737 }else{
1738 // The list is empty
1739 assert(!list->head);
1740 list->head = b;
1741 list->last = b;
1742 }
1743 unlock_buffer_list(list);
1744 }
1745
1746 static int queue_buffer_bytes(buffer_head_t *list, char *buff, int len)
1747 {
1748 buffer_list_t *b = allocate_buffer(buff,len);
1749 if (!b)
1750 return ZSYSTEMERROR;
1751 queue_buffer(list, b, 0);
1752 return ZOK;
1753 }
1754
1755 static int queue_front_buffer_bytes(buffer_head_t *list, char *buff, int len)
1756 {
1757 buffer_list_t *b = allocate_buffer(buff,len);
1758 if (!b)
1759 return ZSYSTEMERROR;
1760 queue_buffer(list, b, 1);
1761 return ZOK;
1762 }
1763
1764 static __attribute__ ((unused)) int get_queue_len(buffer_head_t *list)
1765 {
1766 int i;
1767 buffer_list_t *ptr;
1768 lock_buffer_list(list);
1769 ptr = list->head;
1770 for (i=0; ptr!=0; ptr=ptr->next, i++)
1771 ;
1772 unlock_buffer_list(list);
1773 return i;
1774 }
1775 /* returns:
1776 * -1 if send failed,
1777 * 0 if send would block while sending the buffer (or a send was incomplete),
1778 * 1 if success
1779 */
1780 static int send_buffer(zhandle_t *zh, buffer_list_t *buff)
1781 {
1782 int len = buff->len;
1783 int off = buff->curr_offset;
1784 int rc = -1;
1785
1786 if (off < 4) {
1787 /* we need to send the length at the beginning */
1788 int nlen = htonl(len);
1789 char *b = (char*)&nlen;
1790 rc = zookeeper_send(zh->fd, b + off, sizeof(nlen) - off);
1791 if (rc == -1) {
1792 #ifdef _WIN32
1793 if (WSAGetLastError() != WSAEWOULDBLOCK) {
1794 #else
1795 if (errno != EAGAIN) {
1796 #endif
1797 return -1;
1798 } else {
1799 return 0;
1800 }
1801 } else {
1802 buff->curr_offset += rc;
1803 }
1804 off = buff->curr_offset;
1805 }
1806 if (off >= 4) {
1807 /* want off to now represent the offset into the buffer */
1808 off -= sizeof(buff->len);
1809 rc = zookeeper_send(zh->fd, buff->buffer + off, len - off);
1810 if (rc == -1) {
1811 #ifdef _WIN32
1812 if (WSAGetLastError() != WSAEWOULDBLOCK) {
1813 #else
1814 if (errno != EAGAIN) {
1815 #endif
1816 return -1;
1817 }
1818 } else {
1819 buff->curr_offset += rc;
1820 }
1821 }
1822 return buff->curr_offset == len + sizeof(buff->len);
1823 }
1824
1825 /* returns:
1826 * -1 if recv call failed,
1827 * 0 if recv would block,
1828 * 1 if success
1829 */
1830 static int recv_buffer(zhandle_t *zh, buffer_list_t *buff)
1831 {
1832 int off = buff->curr_offset;
1833 int rc = 0;
1834
1835 /* if buffer is less than 4, we are reading in the length */
1836 if (off < 4) {
1837 char *buffer = (char*)&(buff->len);
1838 rc = zookeeper_recv(zh->fd, buffer+off, sizeof(int)-off, 0);
1839 switch (rc) {
1840 case 0:
1841 errno = EHOSTDOWN;
1842 case -1:
1843 #ifdef _WIN32
1844 if (WSAGetLastError() == WSAEWOULDBLOCK) {
1845 #else
1846 if (errno == EAGAIN) {
1847 #endif
1848 return 0;
1849 }
1850 return -1;
1851 default:
1852 buff->curr_offset += rc;
1853 }
1854 off = buff->curr_offset;
1855 if (buff->curr_offset == sizeof(buff->len)) {
1856 buff->len = ntohl(buff->len);
1857 buff->buffer = calloc(1, buff->len);
1858 }
1859 }
1860 if (buff->buffer) {
1861 /* want off to now represent the offset into the buffer */
1862 off -= sizeof(buff->len);
1863
1864 rc = zookeeper_recv(zh->fd, buff->buffer+off, buff->len-off, 0);
1865
1866 /* dirty hack to make new client work against old server
1867 * old server sends 40 bytes to finish connection handshake,
1868 * while we're expecting 41 (1 byte for read-only mode data) */
1869 if (rc > 0 && buff == &zh->primer_buffer) {
1870 /* primer_buffer's curr_offset starts at 4 (see prime_connection) */
1871 int avail = buff->curr_offset - sizeof(buff->len) + rc;
1872
1873 /* exactly 40 bytes (out of 41 expected) collected? */
1874 if (avail == buff->len - 1) {
1875 int32_t reply_len;
1876
1877 /* extract length of ConnectResponse (+ 1-byte flag?) */
1878 memcpy(&reply_len, buff->buffer, sizeof(reply_len));
1879 reply_len = ntohl(reply_len);
1880
1881 /* if 1-byte flag was not sent, fake it (value 0) */
1882 if ((int)(reply_len + sizeof(reply_len)) == buff->len - 1) {
1883 ++rc;
1884 }
1885 }
1886 }
1887
1888 switch(rc) {
1889 case 0:
1890 errno = EHOSTDOWN;
1891 case -1:
1892 #ifdef _WIN32
1893 if (WSAGetLastError() == WSAEWOULDBLOCK) {
1894 #else
1895 if (errno == EAGAIN) {
1896 #endif
1897 break;
1898 }
1899 return -1;
1900 default:
1901 buff->curr_offset += rc;
1902 }
1903 }
1904 return buff->curr_offset == buff->len + sizeof(buff->len);
1905 }
1906
1907 void free_buffers(buffer_head_t *list)
1908 {
1909 while (remove_buffer(list))
1910 ;
1911 }
1912
1913 void free_completions(zhandle_t *zh,int callCompletion,int reason)
1914 {
1915 completion_head_t tmp_list;
1916 struct oarchive *oa;
1917 struct ReplyHeader h;
1918 void_completion_t auth_completion = NULL;
1919 auth_completion_list_t a_list, *a_tmp;
1920
1921 if (lock_completion_list(&zh->sent_requests) == 0) {
1922 tmp_list = zh->sent_requests;
1923 zh->sent_requests.head = 0;
1924 zh->sent_requests.last = 0;
1925 unlock_completion_list(&zh->sent_requests);
1926 while (tmp_list.head) {
1927 completion_list_t *cptr = tmp_list.head;
1928
1929 tmp_list.head = cptr->next;
1930 if (cptr->c.data_result == SYNCHRONOUS_MARKER) {
1931 #ifdef THREADED
1932 struct sync_completion
1933 *sc = (struct sync_completion*)cptr->data;
1934 sc->rc = reason;
1935 notify_sync_completion(sc);
1936 zh->outstanding_sync--;
1937 destroy_completion_entry(cptr);
1938 #else
1939 abort_singlethreaded(zh);
1940 #endif
1941 } else if (callCompletion) {
1942 // Fake the response
1943 buffer_list_t *bptr;
1944 h.xid = cptr->xid;
1945 h.zxid = -1;
1946 h.err = reason;
1947 oa = create_buffer_oarchive();
1948 serialize_ReplyHeader(oa, "header", &h);
1949 bptr = calloc(sizeof(*bptr), 1);
1950 assert(bptr);
1951 bptr->len = get_buffer_len(oa);
1952 bptr->buffer = get_buffer(oa);
1953 close_buffer_oarchive(&oa, 0);
1954 cptr->buffer = bptr;
1955 queue_completion(&zh->completions_to_process, cptr, 0);
1956 }
1957 }
1958 }
1959
1960 zoo_lock_auth(zh);
1961 a_list.completion = NULL;
1962 a_list.next = NULL;
1963 get_auth_completions(&zh->auth_h, &a_list);
1964 zoo_unlock_auth(zh);
1965
1966 a_tmp = &a_list;
1967 // chain call user's completion function
1968 while (a_tmp->completion != NULL) {
1969 auth_completion = a_tmp->completion;
1970 auth_completion(reason, a_tmp->auth_data);
1971 a_tmp = a_tmp->next;
1972 if (a_tmp == NULL)
1973 break;
1974 }
1975
1976 free_auth_completion(&a_list);
1977 }
1978
1979 static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc)
1980 {
1981 enter_critical(zh);
1982 free_buffers(&zh->to_send);
1983 free_buffers(&zh->to_process);
1984 free_completions(zh,callCompletion,rc);
1985 leave_critical(zh);
1986 if (zh->input_buffer && zh->input_buffer != &zh->primer_buffer) {
1987 free_buffer(zh->input_buffer);
1988 zh->input_buffer = 0;
1989 }
1990 }
1991
1992 /* return 1 if zh's state is ZOO_CONNECTED_STATE or ZOO_READONLY_STATE,
1993 * 0 otherwise */
1994 static int is_connected(zhandle_t* zh)
1995 {
1996 return (zh->state==ZOO_CONNECTED_STATE || zh->state==ZOO_READONLY_STATE);
1997 }
1998
1999 static void cleanup(zhandle_t *zh,int rc)
2000 {
2001 close_zsock(zh->fd);
2002 if (is_unrecoverable(zh)) {
2003 LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
2004 state2String(zh->state));
2005 PROCESS_SESSION_EVENT(zh, zh->state);
2006 } else if (is_connected(zh)) {
2007 LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE");
2008 PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
2009 }
2010 cleanup_bufs(zh,1,rc);
2011
2012 LOG_DEBUG(LOGCALLBACK(zh), "Previous connection=%s delay=%d", zoo_get_current_server(zh), zh->delay);
2013
2014 if (!is_unrecoverable(zh)) {
2015 zh->state = 0;
2016 }
2017 if (process_async(zh->outstanding_sync)) {
2018 process_completions(zh);
2019 }
2020 }
2021
2022 static void handle_error(zhandle_t *zh,int rc)
2023 {
2024 cleanup(zh, rc);
2025 // NOTE: If we're at the end of the list of addresses to connect to, then
2026 // we want to delay the next connection attempt to avoid spinning.
2027 // Then increment what host we'll connect to since we failed to connect to current
2028 zh->delay = addrvec_atend(&zh->addrs);
2029 addrvec_next(&zh->addrs, &zh->addr_cur);
2030 }
2031
2032 static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
2033 const char* format, ...)
2034 {
2035 if(logLevel>=ZOO_LOG_LEVEL_ERROR){
2036 va_list va;
2037 char buf[1024];
2038 va_start(va,format);
2039 vsnprintf(buf, sizeof(buf)-1,format,va);
2040 log_message(LOGCALLBACK(zh), ZOO_LOG_LEVEL_ERROR,line,__func__,
2041 "Socket %s zk retcode=%d, errno=%d(%s): %s",
2042 zoo_get_current_server(zh),rc,errno,strerror(errno),buf);
2043 va_end(va);
2044 }
2045 handle_error(zh,rc);
2046 return rc;
2047 }
2048
2049 static void auth_completion_func(int rc, zhandle_t* zh)
2050 {
2051 void_completion_t auth_completion = NULL;
2052 auth_completion_list_t a_list;
2053 auth_completion_list_t *a_tmp;
2054
2055 if(zh==NULL)
2056 return;
2057
2058 zoo_lock_auth(zh);
2059
2060 if(rc!=0){
2061 zh->state=ZOO_AUTH_FAILED_STATE;
2062 }else{
2063 //change state for all auths
2064 mark_active_auth(zh);
2065 }
2066 a_list.completion = NULL;
2067 a_list.next = NULL;
2068 get_auth_completions(&zh->auth_h, &a_list);
2069 zoo_unlock_auth(zh);
2070 if (rc) {
2071 LOG_ERROR(LOGCALLBACK(zh), "Authentication scheme %s failed. Connection closed.",
2072 zh->auth_h.auth->scheme);
2073 }
2074 else {
2075 LOG_INFO(LOGCALLBACK(zh), "Authentication scheme %s succeeded", zh->auth_h.auth->scheme);
2076 }
2077 a_tmp = &a_list;
2078 // chain call user's completion function
2079 while (a_tmp->completion != NULL) {
2080 auth_completion = a_tmp->completion;
2081 auth_completion(rc, a_tmp->auth_data);
2082 a_tmp = a_tmp->next;
2083 if (a_tmp == NULL)
2084 break;
2085 }
2086 free_auth_completion(&a_list);
2087 }
2088
2089 static int send_info_packet(zhandle_t *zh, auth_info* auth) {
2090 struct oarchive *oa;
2091 struct RequestHeader h = {AUTH_XID, ZOO_SETAUTH_OP};
2092 struct AuthPacket req;
2093 int rc;
2094 oa = create_buffer_oarchive();
2095 rc = serialize_RequestHeader(oa, "header", &h);
2096 req.type=0; // ignored by the server
2097 req.scheme = auth->scheme;
2098 req.auth = auth->auth;
2099 rc = rc < 0 ? rc : serialize_AuthPacket(oa, "req", &req);
2100 /* add this buffer to the head of the send queue */
2101 rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
2102 get_buffer_len(oa));
2103 /* We queued the buffer, so don't free it */
2104 close_buffer_oarchive(&oa, 0);
2105
2106 return rc;
2107 }
2108
2109 /** send all auths, not just the last one **/
2110 static int send_auth_info(zhandle_t *zh) {
2111 int rc = 0;
2112 auth_info *auth = NULL;
2113
2114 zoo_lock_auth(zh);
2115 auth = zh->auth_h.auth;
2116 if (auth == NULL) {
2117 zoo_unlock_auth(zh);
2118 return ZOK;
2119 }
2120 while (auth != NULL) {
2121 rc = send_info_packet(zh, auth);
2122 auth = auth->next;
2123 }
2124 zoo_unlock_auth(zh);
2125 LOG_DEBUG(LOGCALLBACK(zh), "Sending all auth info request to %s", zoo_get_current_server(zh));
2126 return (rc <0) ? ZMARSHALLINGERROR:ZOK;
2127 }
2128
2129 static int send_last_auth_info(zhandle_t *zh)
2130 {
2131 int rc = 0;
2132 auth_info *auth = NULL;
2133
2134 zoo_lock_auth(zh);
2135 auth = get_last_auth(&zh->auth_h);
2136 if(auth==NULL) {
2137 zoo_unlock_auth(zh);
2138 return ZOK; // there is nothing to send
2139 }
2140 rc = send_info_packet(zh, auth);
2141 zoo_unlock_auth(zh);
2142 LOG_DEBUG(LOGCALLBACK(zh), "Sending auth info request to %s",zoo_get_current_server(zh));
2143 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
2144 }
2145
2146 static void free_key_list(char **list, int count)
2147 {
2148 int i;
2149
2150 for(i = 0; i < count; i++) {
2151 free(list[i]);
2152 }
2153 free(list);
2154 }
2155
2156 static int send_set_watches(zhandle_t *zh)
2157 {
2158 struct oarchive *oa;
2159 struct RequestHeader h = {SET_WATCHES_XID, ZOO_SETWATCHES_OP};
2160 struct SetWatches req;
2161 int rc;
2162
2163 req.relativeZxid = zh->last_zxid;
2164 lock_watchers(zh);
2165 req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
2166 req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
2167 req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);
2168 unlock_watchers(zh);
2169
2170 // return if there are no pending watches
2171 if (!req.dataWatches.count && !req.existWatches.count &&
2172 !req.childWatches.count) {
2173 free_key_list(req.dataWatches.data, req.dataWatches.count);
2174 free_key_list(req.existWatches.data, req.existWatches.count);
2175 free_key_list(req.childWatches.data, req.childWatches.count);
2176 return ZOK;
2177 }
2178
2179
2180 oa = create_buffer_oarchive();
2181 rc = serialize_RequestHeader(oa, "header", &h);
2182 rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req);
2183 /* add this buffer to the head of the send queue */
2184 rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
2185 get_buffer_len(oa));
2186 /* We queued the buffer, so don't free it */
2187 close_buffer_oarchive(&oa, 0);
2188 free_key_list(req.dataWatches.data, req.dataWatches.count);
2189 free_key_list(req.existWatches.data, req.existWatches.count);
2190 free_key_list(req.childWatches.data, req.childWatches.count);
2191 LOG_DEBUG(LOGCALLBACK(zh), "Sending set watches request to %s",zoo_get_current_server(zh));
2192 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
2193 }
2194
2195 static int serialize_prime_connect(struct connect_req *req, char* buffer){
2196 //this should be the order of serialization
2197 int offset = 0;
2198 req->protocolVersion = htonl(req->protocolVersion);
2199 memcpy(buffer + offset, &req->protocolVersion, sizeof(req->protocolVersion));
2200 offset = offset + sizeof(req->protocolVersion);
2201
2202 req->lastZxidSeen = zoo_htonll(req->lastZxidSeen);
2203 memcpy(buffer + offset, &req->lastZxidSeen, sizeof(req->lastZxidSeen));
2204 offset = offset + sizeof(req->lastZxidSeen);
2205
2206 req->timeOut = htonl(req->timeOut);
2207 memcpy(buffer + offset, &req->timeOut, sizeof(req->timeOut));
2208 offset = offset + sizeof(req->timeOut);
2209
2210 req->sessionId = zoo_htonll(req->sessionId);
2211 memcpy(buffer + offset, &req->sessionId, sizeof(req->sessionId));
2212 offset = offset + sizeof(req->sessionId);
2213
2214 req->passwd_len = htonl(req->passwd_len);
2215 memcpy(buffer + offset, &req->passwd_len, sizeof(req->passwd_len));
2216 offset = offset + sizeof(req->passwd_len);
2217
2218 memcpy(buffer + offset, req->passwd, sizeof(req->passwd));
2219 offset = offset + sizeof(req->passwd);
2220
2221 memcpy(buffer + offset, &req->readOnly, sizeof(req->readOnly));
2222
2223 return 0;
2224 }
2225
2226 static int deserialize_prime_response(struct prime_struct *resp, char* buffer)
2227 {
2228 //this should be the order of deserialization
2229 int offset = 0;
2230 memcpy(&resp->len, buffer + offset, sizeof(resp->len));
2231 offset = offset + sizeof(resp->len);
2232
2233 resp->len = ntohl(resp->len);
2234 memcpy(&resp->protocolVersion,
2235 buffer + offset,
2236 sizeof(resp->protocolVersion));
2237 offset = offset + sizeof(resp->protocolVersion);
2238
2239 resp->protocolVersion = ntohl(resp->protocolVersion);
2240 memcpy(&resp->timeOut, buffer + offset, sizeof(resp->timeOut));
2241 offset = offset + sizeof(resp->timeOut);
2242
2243 resp->timeOut = ntohl(resp->timeOut);
2244 memcpy(&resp->sessionId, buffer + offset, sizeof(resp->sessionId));
2245 offset = offset + sizeof(resp->sessionId);
2246
2247 resp->sessionId = zoo_htonll(resp->sessionId);
2248 memcpy(&resp->passwd_len, buffer + offset, sizeof(resp->passwd_len));
2249 offset = offset + sizeof(resp->passwd_len);
2250
2251 resp->passwd_len = ntohl(resp->passwd_len);
2252 memcpy(resp->passwd, buffer + offset, sizeof(resp->passwd));
2253 offset = offset + sizeof(resp->passwd);
2254
2255 memcpy(&resp->readOnly, buffer + offset, sizeof(resp->readOnly));
2256
2257 return 0;
2258 }
2259
2260 static int prime_connection(zhandle_t *zh)
2261 {
2262 int rc;
2263 /*this is the size of buffer to serialize req into*/
2264 char buffer_req[HANDSHAKE_REQ_SIZE];
2265 int len = sizeof(buffer_req);
2266 int hlen = 0;
2267 struct connect_req req;
2268
2269 if (zh->state == ZOO_SSL_CONNECTING_STATE) {
2270 // The SSL connection is yet to happen.
2271 return ZOK;
2272 }
2273 req.protocolVersion = 0;
2274 req.sessionId = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
2275 req.passwd_len = sizeof(req.passwd);
2276 memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
2277 req.timeOut = zh->recv_timeout;
2278 req.lastZxidSeen = zh->last_zxid;
2279 req.readOnly = zh->allow_read_only;
2280 hlen = htonl(len);
2281 /* We are running fast and loose here, but this string should fit in the initial buffer! */
2282 rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
2283 serialize_prime_connect(&req, buffer_req);
2284 rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len);
2285 if (rc<0) {
2286 return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
2287 "failed to send a handshake packet: %s", strerror(errno));
2288 }
2289 zh->state = ZOO_ASSOCIATING_STATE;
2290
2291 zh->input_buffer = &zh->primer_buffer;
2292 memset(zh->input_buffer->buffer, 0, zh->input_buffer->len);
2293
2294 /* This seems a bit weird to to set the offset to 4, but we already have a
2295 * length, so we skip reading the length (and allocating the buffer) by
2296 * saying that we are already at offset 4 */
2297 zh->input_buffer->curr_offset = 4;
2298
2299 return ZOK;
2300 }
2301
2302 static inline int calculate_interval(const struct timeval *start,
2303 const struct timeval *end)
2304 {
2305 int interval;
2306 struct timeval i = *end;
2307 i.tv_sec -= start->tv_sec;
2308 i.tv_usec -= start->tv_usec;
2309 interval = i.tv_sec * 1000 + (i.tv_usec/1000);
2310 return interval;
2311 }
2312
2313 static struct timeval get_timeval(int interval)
2314 {
2315 struct timeval tv;
2316 if (interval < 0) {
2317 interval = 0;
2318 }
2319 tv.tv_sec = interval/1000;
2320 tv.tv_usec = (interval%1000)*1000;
2321 return tv;
2322 }
2323
2324 static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
2325 const void *data);
2326 static int add_string_completion(zhandle_t *zh, int xid,
2327 string_completion_t dc, const void *data);
2328 static int add_string_stat_completion(zhandle_t *zh, int xid,
2329 string_stat_completion_t dc, const void *data);
2330
2331
2332 int send_ping(zhandle_t* zh)
2333 {
2334 int rc;
2335 struct oarchive *oa = create_buffer_oarchive();
2336 struct RequestHeader h = {PING_XID, ZOO_PING_OP};
2337
2338 rc = serialize_RequestHeader(oa, "header", &h);
2339 enter_critical(zh);
2340 get_system_time(&zh->last_ping);
2341 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
2342 get_buffer_len(oa));
2343 leave_critical(zh);
2344 close_buffer_oarchive(&oa, 0);
2345 return rc<0 ? rc : adaptor_send_queue(zh, 0);
2346 }
2347
2348 /* upper bound of a timeout for seeking for r/w server when in read-only mode */
2349 const int MAX_RW_TIMEOUT = 60000;
2350 const int MIN_RW_TIMEOUT = 200;
2351
2352 static int ping_rw_server(zhandle_t* zh)
2353 {
2354 char buf[10];
2355 zsock_t fd;
2356 int rc;
2357 sendsize_t ssize;
2358 int sock_flags;
2359
2360 addrvec_peek(&zh->addrs, &zh->addr_rw_server);
2361
2362 #ifdef SOCK_CLOEXEC_ENABLED
2363 sock_flags = SOCK_STREAM | SOCK_CLOEXEC;
2364 #else
2365 sock_flags = SOCK_STREAM;
2366 #endif
2367 fd.sock = socket(zh->addr_rw_server.ss_family, sock_flags, 0);
2368 if (fd.sock < 0) {
2369 return 0;
2370 }
2371
2372 zookeeper_set_sock_nodelay(zh, fd.sock);
2373 zookeeper_set_sock_timeout(zh, fd.sock, 1);
2374
2375 rc = zookeeper_connect(zh, &zh->addr_rw_server, fd.sock);
2376 if (rc < 0) {
2377 return 0;
2378 }
2379
2380 #ifdef HAVE_OPENSSL_H
2381 fd.ssl_sock = NULL;
2382 fd.ssl_ctx = NULL;
2383
2384 if (zh->fd->cert != NULL) {
2385 fd.cert = zh->fd->cert;
2386 rc = init_ssl_for_socket(&fd, zh, 0);
2387 if (rc != ZOK) {
2388 rc = 0;
2389 goto out;
2390 }
2391 }
2392 #endif
2393
2394 ssize = zookeeper_send(&fd, "isro", 4);
2395 if (ssize < 0) {
2396 rc = 0;
2397 goto out;
2398 }
2399
2400 memset(buf, 0, sizeof(buf));
2401 rc = zookeeper_recv(&fd, buf, sizeof(buf), 0);
2402 if (rc < 0) {
2403 rc = 0;
2404 goto out;
2405 }
2406
2407 rc = strcmp("rw", buf) == 0;
2408
2409 out:
2410 close_zsock(&fd);
2411 addr_rw_server = rc ? &zh->addr_rw_server : 0;
2412 return rc;
2413 }
2414
2415 #if !defined(WIN32) && !defined(min)
2416 static inline int min(int a, int b)
2417 {
2418 return a < b ? a : b;
2419 }
2420 #endif
2421
2422 static void zookeeper_set_sock_noblock(zhandle_t *zh, socket_t sock)
2423 {
2424 #ifdef _WIN32
2425 ULONG nonblocking_flag = 1;
2426
2427 ioctlsocket(sock, FIONBIO, &nonblocking_flag);
2428 #else
2429 fcntl(sock, F_SETFL, O_NONBLOCK|fcntl(sock, F_GETFL, 0));
2430 #endif
2431 }
2432
2433 static void zookeeper_set_sock_timeout(zhandle_t *zh, socket_t s, int timeout)
2434 {
2435 struct timeval tv;
2436
2437 tv.tv_sec = timeout;
2438 setsockopt(s, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
2439 setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
2440 }
2441
2442 static void zookeeper_set_sock_nodelay(zhandle_t *zh, socket_t sock)
2443 {
2444 #ifdef _WIN32
2445 char enable_tcp_nodelay = 1;
2446 #else
2447 int enable_tcp_nodelay = 1;
2448 #endif
2449 int rc;
2450
2451 rc = setsockopt(sock,
2452 IPPROTO_TCP,
2453 TCP_NODELAY,
2454 &enable_tcp_nodelay,
2455 sizeof(enable_tcp_nodelay));
2456
2457 if (rc) {
2458 LOG_WARN(LOGCALLBACK(zh),
2459 "Unable to set TCP_NODELAY, latency may be effected");
2460 }
2461 }
2462
2463 static socket_t zookeeper_connect(zhandle_t *zh,
2464 struct sockaddr_storage *addr,
2465 socket_t fd)
2466 {
2467 int rc;
2468 int addr_len;
2469
2470 #if defined(AF_INET6)
2471 if (addr->ss_family == AF_INET6) {
2472 addr_len = sizeof(struct sockaddr_in6);
2473 } else {
2474 addr_len = sizeof(struct sockaddr_in);
2475 }
2476 #else
2477 addr_len = sizeof(struct sockaddr_in);
2478 #endif
2479
2480 LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
2481 rc = connect(fd, (struct sockaddr *)addr, addr_len);
2482
2483 #ifdef _WIN32
2484 errno = GetLastError();
2485
2486 #ifndef EWOULDBLOCK
2487 #define EWOULDBLOCK WSAEWOULDBLOCK
2488 #endif
2489
2490 #ifndef EINPROGRESS
2491 #define EINPROGRESS WSAEINPROGRESS
2492 #endif
2493
2494 #if _MSC_VER >= 1600
2495 switch(errno) {
2496 case WSAEWOULDBLOCK:
2497 errno = EWOULDBLOCK;
2498 break;
2499 case WSAEINPROGRESS:
2500 errno = EINPROGRESS;
2501 break;
2502 }
2503 #endif
2504 #endif
2505
2506 return rc;
2507 }
2508
2509 int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
2510 struct timeval *tv)
2511 {
2512 int sock_flags;
2513 int rc = 0;
2514 struct timeval now;
2515
2516 #ifdef SOCK_CLOEXEC_ENABLED
2517 sock_flags = SOCK_STREAM | SOCK_CLOEXEC;
2518 #else
2519 sock_flags = SOCK_STREAM;
2520 #endif
2521
2522 if(zh==0 || fd==0 ||interest==0 || tv==0)
2523 return ZBADARGUMENTS;
2524 if (is_unrecoverable(zh))
2525 return ZINVALIDSTATE;
2526 get_system_time(&now);
2527 if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
2528 int time_left = calculate_interval(&zh->next_deadline, &now);
2529 int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
2530 (zh->recv_timeout / 10);
2531 if (time_left > max_exceed)
2532 LOG_WARN(LOGCALLBACK(zh), "Exceeded deadline by %dms", time_left);
2533 }
2534 api_prolog(zh);
2535
2536 rc = update_addrs(zh, &now);
2537 if (rc != ZOK) {
2538 return api_epilog(zh, rc);
2539 }
2540
2541 *fd = zh->fd->sock;
2542 *interest = 0;
2543 tv->tv_sec = 0;
2544 tv->tv_usec = 0;
2545
2546 if (*fd == -1) {
2547 /*
2548 * If we previously failed to connect to server pool (zh->delay == 1)
2549 * then we need delay our connection on this iteration 1/60 of the
2550 * recv timeout before trying again so we don't spin.
2551 *
2552 * We always clear the delay setting. If we fail again, we'll set delay
2553 * again and on the next iteration we'll do the same.
2554 *
2555 * We will also delay if the disable_reconnection_attempt is set.
2556 */
2557 if (zh->delay == 1 || zh->disable_reconnection_attempt == 1) {
2558 *tv = get_timeval(zh->recv_timeout/60);
2559 zh->delay = 0;
2560
2561 LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]",
2562 zh->hostname);
2563 } else {
2564 if (addr_rw_server) {
2565 zh->addr_cur = *addr_rw_server;
2566 addr_rw_server = 0;
2567 } else {
2568 // No need to delay -- grab the next server and attempt connection
2569 zoo_cycle_next_server(zh);
2570 }
2571 zh->fd->sock = socket(zh->addr_cur.ss_family, sock_flags, 0);
2572 if (zh->fd->sock < 0) {
2573 rc = handle_socket_error_msg(zh,
2574 __LINE__,
2575 ZSYSTEMERROR,
2576 "socket() call failed");
2577 return api_epilog(zh, rc);
2578 }
2579
2580 zookeeper_set_sock_nodelay(zh, zh->fd->sock);
2581 zookeeper_set_sock_noblock(zh, zh->fd->sock);
2582
2583 rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd->sock);
2584
2585 if (rc == -1) {
2586 /* we are handling the non-blocking connect according to
2587 * the description in section 16.3 "Non-blocking connect"
2588 * in UNIX Network Programming vol 1, 3rd edition */
2589 if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
2590 // For SSL, we first go to ZOO_SSL_CONNECTING_STATE
2591 if (zh->fd->cert != NULL)
2592 zh->state = ZOO_SSL_CONNECTING_STATE;
2593 else
2594 zh->state = ZOO_CONNECTING_STATE;
2595 } else {
2596 rc = handle_socket_error_msg(zh,
2597 __LINE__,
2598 ZCONNECTIONLOSS,
2599 "connect() call failed");
2600 return api_epilog(zh, rc);
2601 }
2602 } else {
2603 #ifdef HAVE_OPENSSL_H
2604 if (zh->fd->cert != NULL) {
2605 // We do SSL_connect() here
2606 if (init_ssl_for_handler(zh) != ZOK) {
2607 return ZSSLCONNECTIONERROR;
2608 }
2609 }
2610 #endif
2611 rc = prime_connection(zh);
2612 if (rc != 0) {
2613 return api_epilog(zh,rc);
2614 }
2615
2616 LOG_INFO(LOGCALLBACK(zh),
2617 "Initiated connection to server %s",
2618 format_endpoint_info(&zh->addr_cur));
2619 }
2620 *tv = get_timeval(zh->recv_timeout/3);
2621 }
2622 *fd = zh->fd->sock;
2623 zh->last_recv = now;
2624 zh->last_send = now;
2625 zh->last_ping = now;
2626 zh->last_ping_rw = now;
2627 zh->ping_rw_timeout = MIN_RW_TIMEOUT;
2628 }
2629
2630 if (zh->fd->sock != -1) {
2631 int idle_recv = calculate_interval(&zh->last_recv, &now);
2632 int idle_send = calculate_interval(&zh->last_send, &now);
2633 int recv_to = zh->recv_timeout*2/3 - idle_recv;
2634 int send_to = zh->recv_timeout/3;
2635 // have we exceeded the receive timeout threshold?
2636 if (recv_to <= 0 && zh->state != ZOO_SSL_CONNECTING_STATE) {
2637 // We gotta cut our losses and connect to someone else
2638 #ifdef _WIN32
2639 errno = WSAETIMEDOUT;
2640 #else
2641 errno = ETIMEDOUT;
2642 #endif
2643 *interest=0;
2644 *tv = get_timeval(0);
2645 return api_epilog(zh,handle_socket_error_msg(zh,
2646 __LINE__,ZOPERATIONTIMEOUT,
2647 "connection to %s timed out (exceeded timeout by %dms)",
2648 format_endpoint_info(&zh->addr_cur),
2649 -recv_to));
2650
2651 }
2652
2653 // We only allow 1/3 of our timeout time to expire before sending
2654 // a PING
2655 if (is_connected(zh)) {
2656 send_to = zh->recv_timeout/3 - idle_send;
2657 if (send_to <= 0) {
2658 if (zh->sent_requests.head == 0) {
2659 rc = send_ping(zh);
2660 if (rc < 0) {
2661 LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
2662 return api_epilog(zh,rc);
2663 }
2664 }
2665 send_to = zh->recv_timeout/3;
2666 }
2667 }
2668
2669 // If we are in read-only mode, seek for read/write server
2670 if (zh->state == ZOO_READONLY_STATE) {
2671 int idle_ping_rw = calculate_interval(&zh->last_ping_rw, &now);
2672 if (idle_ping_rw >= zh->ping_rw_timeout) {
2673 zh->last_ping_rw = now;
2674 idle_ping_rw = 0;
2675 zh->ping_rw_timeout = min(zh->ping_rw_timeout * 2,
2676 MAX_RW_TIMEOUT);
2677 if (ping_rw_server(zh)) {
2678 struct sockaddr_storage addr;
2679 addrvec_peek(&zh->addrs, &addr);
2680 zh->ping_rw_timeout = MIN_RW_TIMEOUT;
2681 LOG_INFO(LOGCALLBACK(zh),
2682 "r/w server found at %s",
2683 format_endpoint_info(&addr));
2684 cleanup(zh, ZOK);
2685 } else {
2686 addrvec_next(&zh->addrs, NULL);
2687 }
2688 }
2689 send_to = min(send_to, zh->ping_rw_timeout - idle_ping_rw);
2690 }
2691
2692 // choose the lesser value as the timeout
2693 *tv = get_timeval(min(recv_to, send_to));
2694
2695 zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
2696 zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
2697 if (zh->next_deadline.tv_usec > 1000000) {
2698 zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
2699 zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
2700 }
2701 *interest = ZOOKEEPER_READ;
2702 /* we are interested in a write if we are connected and have something
2703 * to send, or we are waiting for a connect to finish. */
2704 if ((zh->to_send.head && (is_connected(zh) || is_sasl_auth_in_progress(zh)))
2705 || zh->state == ZOO_CONNECTING_STATE
2706 || zh->state == ZOO_SSL_CONNECTING_STATE) {
2707 *interest |= ZOOKEEPER_WRITE;
2708 }
2709 }
2710 return api_epilog(zh,ZOK);
2711 }
2712
2713 #ifdef HAVE_OPENSSL_H
2714
2715 /*
2716 * use this function, if you want to init SSL for the socket currently registered in the zookeeper handler
2717 */
2718 static int init_ssl_for_handler(zhandle_t *zh)
2719 {
2720 int rc = init_ssl_for_socket(zh->fd, zh, 1);
2721 if (rc == ZOK) {
2722 // (SUCCESS) Now mark the ZOO_CONNECTING_STATE so that
2723 // prime_connection() happen.
2724 // prime_connection() only happens in ZOO_CONNECTING_STATE
2725 zh->state = ZOO_CONNECTING_STATE;
2726 }
2727 return rc;
2728 }
2729
2730 /*
2731 * use this function, if you want to init SSL for a socket, pointing to a different server address than the one
2732 * currently registered in the zookeeper handler (e.g. ping other servers when you are connected to a read-only one)
2733 */
2734 static int init_ssl_for_socket(zsock_t *fd, zhandle_t *zh, int fail_on_error) {
2735
2736 SSL_CTX **ctx;
2737
2738 if (!fd->ssl_sock) {
2739 const SSL_METHOD *method;
2740
2741 #if OPENSSL_VERSION_NUMBER < 0x10100000L
2742 OpenSSL_add_all_algorithms();
2743 ERR_load_BIO_strings();
2744 ERR_load_crypto_strings();
2745 SSL_load_error_strings();
2746 SSL_library_init();
2747 method = SSLv23_client_method();
2748 #else
2749 OPENSSL_init_ssl(OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, NULL);
2750 method = TLS_client_method();
2751 #endif
2752 if (FIPS_mode() == 0) {
2753 LOG_INFO(LOGCALLBACK(zh), "FIPS mode is OFF ");
2754 } else {
2755 LOG_INFO(LOGCALLBACK(zh), "FIPS mode is ON ");
2756 }
2757 fd->ssl_ctx = SSL_CTX_new(method);
2758 ctx = &fd->ssl_ctx;
2759
2760 SSL_CTX_set_verify(*ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, 0);
2761 /*SERVER CA FILE*/
2762 if (SSL_CTX_load_verify_locations(*ctx, fd->cert->ca, 0) != 1) {
2763 SSL_CTX_free(*ctx);
2764 LOG_ERROR(LOGCALLBACK(zh), "Failed to load CA file %s", fd->cert->ca);
2765 errno = EINVAL;
2766 return ZBADARGUMENTS;
2767 }
2768 if (SSL_CTX_set_default_verify_paths(*ctx) != 1) {
2769 SSL_CTX_free(*ctx);
2770 LOG_ERROR(LOGCALLBACK(zh), "Call to SSL_CTX_set_default_verify_paths failed");
2771 errno = EINVAL;
2772 return ZBADARGUMENTS;
2773 }
2774 /*CLIENT CA FILE (With Certificate Chain)*/
2775 if (SSL_CTX_use_certificate_chain_file(*ctx, fd->cert->cert) != 1) {
2776 SSL_CTX_free(*ctx);
2777 LOG_ERROR(LOGCALLBACK(zh), "Failed to load client certificate chain from %s", fd->cert->cert);
2778 errno = EINVAL;
2779 return ZBADARGUMENTS;
2780 }
2781 /*CLIENT PRIVATE KEY*/
2782 SSL_CTX_set_default_passwd_cb_userdata(*ctx, fd->cert->passwd);
2783 if (SSL_CTX_use_PrivateKey_file(*ctx, fd->cert->key, SSL_FILETYPE_PEM) != 1) {
2784 SSL_CTX_free(*ctx);
2785 LOG_ERROR(LOGCALLBACK(zh), "Failed to load client private key from %s", fd->cert->key);
2786 errno = EINVAL;
2787 return ZBADARGUMENTS;
2788 }
2789 /*CHECK*/
2790 if (SSL_CTX_check_private_key(*ctx) != 1) {
2791 SSL_CTX_free(*ctx);
2792 LOG_ERROR(LOGCALLBACK(zh), "SSL_CTX_check_private_key failed");
2793 errno = EINVAL;
2794 return ZBADARGUMENTS;
2795 }
2796 /*MULTIPLE HANDSHAKE*/
2797 SSL_CTX_set_mode(*ctx, SSL_MODE_AUTO_RETRY);
2798
2799 fd->ssl_sock = SSL_new(*ctx);
2800 if (fd->ssl_sock == NULL) {
2801 if (fail_on_error) {
2802 return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error creating ssl context");
2803 } else {
2804 LOG_ERROR(LOGCALLBACK(zh), "error creating ssl context");
2805 return ZSSLCONNECTIONERROR;
2806 }
2807
2808 }
2809 SSL_set_fd(fd->ssl_sock, fd->sock);
2810 }
2811 while(1) {
2812 int rc;
2813 int sock = fd->sock;
2814 struct timeval tv;
2815 fd_set s_rfds, s_wfds;
2816 tv.tv_sec = 1;
2817 tv.tv_usec = 0;
2818 FD_ZERO(&s_rfds);
2819 FD_ZERO(&s_wfds);
2820 rc = SSL_connect(fd->ssl_sock);
2821 if (rc == 1) {
2822 return ZOK;
2823 } else {
2824 rc = SSL_get_error(fd->ssl_sock, rc);
2825 if (rc == SSL_ERROR_WANT_READ) {
2826 FD_SET(sock, &s_rfds);
2827 FD_CLR(sock, &s_wfds);
2828 } else if (rc == SSL_ERROR_WANT_WRITE) {
2829 FD_SET(sock, &s_wfds);
2830 FD_CLR(sock, &s_rfds);
2831 } else {
2832 if (fail_on_error) {
2833 return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error in ssl connect");
2834 } else {
2835 LOG_ERROR(LOGCALLBACK(zh), "error in ssl connect");
2836 return ZSSLCONNECTIONERROR;
2837 }
2838 }
2839 rc = select(sock + 1, &s_rfds, &s_wfds, NULL, &tv);
2840 if (rc == -1) {
2841 if (fail_on_error) {
2842 return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error in ssl connect (after select)");
2843 } else {
2844 LOG_ERROR(LOGCALLBACK(zh), "error in ssl connect (after select)");
2845 return ZSSLCONNECTIONERROR;
2846 }
2847 }
2848 }
2849 }
2850 }
2851
2852
2853 #endif
2854
2855 /*
2856 * the "bottom half" of the session establishment procedure, executed
2857 * either after receiving the "prime response," or after SASL
2858 * authentication is complete
2859 */
2860 static void finalize_session_establishment(zhandle_t *zh) {
2861 zh->state = zh->primer_storage.readOnly ?
2862 ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
2863 zh->reconfig = 0;
2864 LOG_INFO(LOGCALLBACK(zh),
2865 "session establishment complete on server %s, sessionId=%#llx, negotiated timeout=%d %s",
2866 format_endpoint_info(&zh->addr_cur),
2867 zh->client_id.client_id, zh->recv_timeout,
2868 zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
2869 /* we want the auth to be sent for, but since both call push to front
2870 we need to call send_watch_set first */
2871 send_set_watches(zh);
2872 /* send the authentication packet now */
2873 send_auth_info(zh);
2874 LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
2875 zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
2876 PROCESS_SESSION_EVENT(zh, zh->state);
2877
2878 if (has_sasl_client(zh)) {
2879 /* some packets might have been delayed during SASL negotiaton. */
2880 adaptor_send_queue(zh, 0);
2881 }
2882 }
2883
2884 #ifdef HAVE_CYRUS_SASL_H
2885
2886 /*
2887 * queue an encoded SASL request to ZooKeeper. The packet is added to
2888 * the front of the queue.
2889 *
2890 * \param zh the ZooKeeper handle
2891 * \param client_data the encoded SASL data, ready to send
2892 * \param client_data_len the length of \c client_data
2893 * \return ZOK on success, or ZMARSHALLINGERROR if something went wrong
2894 */
2895 int queue_sasl_request(zhandle_t *zh, const char *client_data, int client_data_len)
2896 {
2897 struct oarchive *oa;
2898 int rc;
2899
2900 /* Java client use normal xid, too. */
2901 struct RequestHeader h = { get_xid(), ZOO_SASL_OP };
2902 struct GetSASLRequest req = { { client_data_len, client_data_len>0 ? (char *) client_data : "" } };
2903
2904 oa = create_buffer_oarchive();
2905 rc = serialize_RequestHeader(oa, "header", &h);
2906 rc = rc < 0 ? rc : serialize_GetSASLRequest(oa, "req", &req);
2907 rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
2908 get_buffer_len(oa));
2909 close_buffer_oarchive(&oa, 0);
2910
2911 LOG_DEBUG(LOGCALLBACK(zh),
2912 "SASL: Queued request len=%d rc=%d", client_data_len, rc);
2913
2914 return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
2915 }
2916
2917 /*
2918 * decode an expected SASL response and perform the corresponding
2919 * authentication step
2920 */
2921 static int process_sasl_response(zhandle_t *zh, char *buffer, int len)
2922 {
2923 struct iarchive *ia = create_buffer_iarchive(buffer, len);
2924 struct ReplyHeader hdr;
2925 struct SetSASLResponse res;
2926 int rc;
2927
2928 rc = ia ? ZOK : ZSYSTEMERROR;
2929 rc = rc < 0 ? rc : deserialize_ReplyHeader(ia, "hdr", &hdr);
2930 rc = rc < 0 ? rc : deserialize_SetSASLResponse(ia, "reply", &res);
2931 rc = rc < 0 ? rc : zoo_sasl_client_step(zh, res.token.buff, res.token.len);
2932 deallocate_SetSASLResponse(&res);
2933 if (ia) {
2934 close_buffer_iarchive(&ia);
2935 }
2936
2937 LOG_DEBUG(LOGCALLBACK(zh),
2938 "SASL: Processed response len=%d rc=%d", len, rc);
2939
2940 return rc;
2941 }
2942
2943 #endif /* HAVE_CYRUS_SASL_H */
2944
2945 static int check_events(zhandle_t *zh, int events)
2946 {
2947 if (zh->fd->sock == -1)
2948 return ZINVALIDSTATE;
2949
2950 #ifdef HAVE_OPENSSL_H
2951 if ((events&ZOOKEEPER_WRITE) && (zh->state == ZOO_SSL_CONNECTING_STATE) && zh->fd->cert != NULL) {
2952 int rc, error;
2953 socklen_t len = sizeof(error);
2954 rc = getsockopt(zh->fd->sock, SOL_SOCKET, SO_ERROR, &error, &len);
2955 /* the description in section 16.4 "Non-blocking connect"
2956 * in UNIX Network Programming vol 1, 3rd edition, points out
2957 * that sometimes the error is in errno and sometimes in error */
2958 if (rc < 0 || error) {
2959 if (rc == 0)
2960 errno = error;
2961 return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
2962 "server refused to accept the client");
2963 }
2964 // We do SSL_connect() here
2965 if (init_ssl_for_handler(zh) != ZOK) {
2966 return ZSSLCONNECTIONERROR;
2967 }
2968 }
2969 #endif
2970
2971 if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
2972 int rc, error;
2973 socklen_t len = sizeof(error);
2974 rc = getsockopt(zh->fd->sock, SOL_SOCKET, SO_ERROR, &error, &len);
2975 /* the description in section 16.4 "Non-blocking connect"
2976 * in UNIX Network Programming vol 1, 3rd edition, points out
2977 * that sometimes the error is in errno and sometimes in error */
2978 if (rc < 0 || error) {
2979 if (rc == 0)
2980 errno = error;
2981 return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
2982 "server refused to accept the client");
2983 }
2984
2985 if((rc=prime_connection(zh))!=0)
2986 return rc;
2987
2988 LOG_INFO(LOGCALLBACK(zh), "initiated connection to server %s", format_endpoint_info(&zh->addr_cur));
2989 return ZOK;
2990 }
2991
2992 if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
2993 /* make the flush call non-blocking by specifying a 0 timeout */
2994 int rc=flush_send_queue(zh,0);
2995 if (rc < 0)
2996 return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
2997 "failed while flushing send queue");
2998 }
2999 if (events&ZOOKEEPER_READ) {
3000 int rc;
3001 if (zh->input_buffer == 0) {
3002 zh->input_buffer = allocate_buffer(0,0);
3003 }
3004
3005 rc = recv_buffer(zh, zh->input_buffer);
3006 if (rc < 0) {
3007 return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
3008 "failed while receiving a server response");
3009 }
3010 if (rc > 0) {
3011 get_system_time(&zh->last_recv);
3012 if (zh->input_buffer != &zh->primer_buffer) {
3013 if (is_connected(zh) || !is_sasl_auth_in_progress(zh)) {
3014 queue_buffer(&zh->to_process, zh->input_buffer, 0);
3015 #ifdef HAVE_CYRUS_SASL_H
3016 } else {
3017 rc = process_sasl_response(zh, zh->input_buffer->buffer, zh->input_buffer->curr_offset);
3018 free_buffer(zh->input_buffer);
3019 if (rc < 0) {
3020 zoo_sasl_mark_failed(zh);
3021 return rc;
3022 } else if (zh->sasl_client->state == ZOO_SASL_COMPLETE) {
3023 /*
3024 * SASL authentication just completed; send
3025 * watches, auth. info, etc. now.
3026 */
3027 finalize_session_establishment(zh);
3028 }
3029 #endif /* HAVE_CYRUS_SASL_H */
3030 }
3031 } else {
3032 int64_t oldid, newid;
3033 //deserialize
3034 deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
3035 /* We are processing the primer_buffer, so we need to finish
3036 * the connection handshake */
3037 oldid = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
3038 zh->seen_rw_server_before |= !zh->primer_storage.readOnly;
3039 newid = zh->primer_storage.sessionId;
3040 if (oldid != 0 && oldid != newid) {
3041 zh->state = ZOO_EXPIRED_SESSION_STATE;
3042 errno = ESTALE;
3043 return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
3044 "sessionId=%#llx has expired.",oldid);
3045 } else {
3046 zh->recv_timeout = zh->primer_storage.timeOut;
3047 zh->client_id.client_id = newid;
3048
3049 memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
3050 sizeof(zh->client_id.passwd));
3051
3052 #ifdef HAVE_CYRUS_SASL_H
3053 if (zh->sasl_client) {
3054 /*
3055 * Start a SASL authentication session.
3056 * Watches, auth. info, etc. will be sent
3057 * after it completes.
3058 */
3059 rc = zoo_sasl_connect(zh);
3060 rc = rc < 0 ? rc : zoo_sasl_client_start(zh);
3061 if (rc < 0) {
3062 zoo_sasl_mark_failed(zh);
3063 return rc;
3064 }
3065 } else {
3066 /* Can send watches, auth. info, etc. immediately. */
3067 finalize_session_establishment(zh);
3068 }
3069 #else /* HAVE_CYRUS_SASL_H */
3070 /* Can send watches, auth. info, etc. immediately. */
3071 finalize_session_establishment(zh);
3072 #endif /* HAVE_CYRUS_SASL_H */
3073 }
3074 }
3075 zh->input_buffer = 0;
3076 } else {
3077 // zookeeper_process was called but there was nothing to read
3078 // from the socket
3079 return ZNOTHING;
3080 }
3081 }
3082 return ZOK;
3083 }
3084
3085 void api_prolog(zhandle_t* zh)
3086 {
3087 inc_ref_counter(zh,1);
3088 }
3089
3090 int api_epilog(zhandle_t *zh,int rc)
3091 {
3092 if(inc_ref_counter(zh,-1)==0 && zh->close_requested!=0)
3093 zookeeper_close(zh);
3094 return rc;
3095 }
3096
3097 //#ifdef THREADED
3098 // IO thread queues session events to be processed by the completion thread
3099 static int queue_session_event(zhandle_t *zh, int state)
3100 {
3101 int rc;
3102 struct WatcherEvent evt = { ZOO_SESSION_EVENT, state, "" };
3103 struct ReplyHeader hdr = { WATCHER_EVENT_XID, 0, 0 };
3104 struct oarchive *oa;
3105 completion_list_t *cptr;
3106
3107 if ((oa=create_buffer_oarchive())==NULL) {
3108 LOG_ERROR(LOGCALLBACK(zh), "out of memory");
3109 goto error;
3110 }
3111 rc = serialize_ReplyHeader(oa, "hdr", &hdr);
3112 rc = rc<0?rc: serialize_WatcherEvent(oa, "event", &evt);
3113 if(rc<0){
3114 close_buffer_oarchive(&oa, 1);
3115 goto error;
3116 }
3117 cptr = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
3118 cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
3119 cptr->buffer->curr_offset = get_buffer_len(oa);
3120 if (!cptr->buffer) {
3121 free(cptr);
3122 close_buffer_oarchive(&oa, 1);
3123 goto error;
3124 }
3125 /* We queued the buffer, so don't free it */
3126 close_buffer_oarchive(&oa, 0);
3127 lock_watchers(zh);
3128 cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
3129 unlock_watchers(zh);
3130 queue_completion(&zh->completions_to_process, cptr, 0);
3131 if (process_async(zh->outstanding_sync)) {
3132 process_completions(zh);
3133 }
3134 return ZOK;
3135 error:
3136 errno=ENOMEM;
3137 return ZSYSTEMERROR;
3138 }
3139 //#endif
3140
3141 completion_list_t *dequeue_completion(completion_head_t *list)
3142 {
3143 completion_list_t *cptr;
3144 lock_completion_list(list);
3145 cptr = list->head;
3146 if (cptr) {
3147 list->head = cptr->next;
3148 if (!list->head) {
3149 assert(list->last == cptr);
3150 list->last = 0;
3151 }
3152 }
3153 unlock_completion_list(list);
3154 return cptr;
3155 }
3156
3157 // cleanup completion list of a failed multi request
3158 static void cleanup_failed_multi(zhandle_t *zh, int xid, int rc, completion_list_t *cptr) {
3159 completion_list_t *entry;
3160 completion_head_t *clist = &cptr->c.clist;
3161 while ((entry = dequeue_completion(clist)) != NULL) {
3162 // Fake failed response for all sub-requests
3163 deserialize_response(zh, entry->c.type, xid, 1, rc, entry, NULL);
3164 destroy_completion_entry(entry);
3165 }
3166 }
3167
3168 static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia)
3169 {
3170 int rc = 0;
3171 completion_head_t *clist = &cptr->c.clist;
3172 struct MultiHeader mhdr = {0, 0, 0};
3173 assert(clist);
3174 deserialize_MultiHeader(ia, "multiheader", &mhdr);
3175 while (!mhdr.done) {
3176 completion_list_t *entry = dequeue_completion(clist);
3177 assert(entry);
3178
3179 if (mhdr.type == -1) {
3180 struct ErrorResponse er;
3181 deserialize_ErrorResponse(ia, "error", &er);
3182 mhdr.err = er.err ;
3183 if (rc == 0 && er.err != 0 && er.err != ZRUNTIMEINCONSISTENCY) {
3184 rc = er.err;
3185 }
3186 }
3187
3188 deserialize_response(zh, entry->c.type, xid, mhdr.type == -1, mhdr.err, entry, ia);
3189 deserialize_MultiHeader(ia, "multiheader", &mhdr);
3190 //While deserializing the response we must destroy completion entry for each operation in
3191 //the zoo_multi transaction. Otherwise this results in memory leak when client invokes zoo_multi
3192 //operation.
3193 destroy_completion_entry(entry);
3194 }
3195
3196 return rc;
3197 }
3198
3199 static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
3200 {
3201 switch (type) {
3202 case COMPLETION_DATA:
3203 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
3204 cptr->xid, failed, rc);
3205 if (failed) {
3206 cptr->c.data_result(rc, 0, 0, 0, cptr->data);
3207 } else {
3208 struct GetDataResponse res;
3209 deserialize_GetDataResponse(ia, "reply", &res);
3210 cptr->c.data_result(rc, res.data.buff, res.data.len,
3211 &res.stat, cptr->data);
3212 deallocate_GetDataResponse(&res);
3213 }
3214 break;
3215 case COMPLETION_STAT:
3216 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
3217 cptr->xid, failed, rc);
3218 if (failed) {
3219 cptr->c.stat_result(rc, 0, cptr->data);
3220 } else {
3221 struct SetDataResponse res;
3222 deserialize_SetDataResponse(ia, "reply", &res);
3223 cptr->c.stat_result(rc, &res.stat, cptr->data);
3224 deallocate_SetDataResponse(&res);
3225 }
3226 break;
3227 case COMPLETION_STRINGLIST:
3228 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
3229 cptr->xid, failed, rc);
3230 if (failed) {
3231 cptr->c.strings_result(rc, 0, cptr->data);
3232 } else {
3233 struct GetChildrenResponse res;
3234 deserialize_GetChildrenResponse(ia, "reply", &res);
3235 cptr->c.strings_result(rc, &res.children, cptr->data);
3236 deallocate_GetChildrenResponse(&res);
3237 }
3238 break;
3239 case COMPLETION_STRINGLIST_STAT:
3240 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
3241 cptr->xid, failed, rc);
3242 if (failed) {
3243 cptr->c.strings_stat_result(rc, 0, 0, cptr->data);
3244 } else {
3245 struct GetChildren2Response res;
3246 deserialize_GetChildren2Response(ia, "reply", &res);
3247 cptr->c.strings_stat_result(rc, &res.children, &res.stat, cptr->data);
3248 deallocate_GetChildren2Response(&res);
3249 }
3250 break;
3251 case COMPLETION_STRING:
3252 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
3253 cptr->xid, failed, rc);
3254 if (failed) {
3255 cptr->c.string_result(rc, 0, cptr->data);
3256 } else {
3257 struct CreateResponse res;
3258 const char *client_path;
3259 memset(&res, 0, sizeof(res));
3260 deserialize_CreateResponse(ia, "reply", &res);
3261 client_path = sub_string(zh, res.path);
3262 cptr->c.string_result(rc, client_path, cptr->data);
3263 free_duplicate_path(client_path, res.path);
3264 deallocate_CreateResponse(&res);
3265 }
3266 break;
3267 case COMPLETION_STRING_STAT:
3268 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRING_STAT for xid=%#x failed=%d, rc=%d",
3269 cptr->xid, failed, rc);
3270 if (failed) {
3271 cptr->c.string_stat_result(rc, 0, 0, cptr->data);
3272 } else {
3273 struct Create2Response res;
3274 const char *client_path;
3275 deserialize_Create2Response(ia, "reply", &res);
3276 client_path = sub_string(zh, res.path);
3277 cptr->c.string_stat_result(rc, client_path, &res.stat, cptr->data);
3278 free_duplicate_path(client_path, res.path);
3279 deallocate_Create2Response(&res);
3280 }
3281 break;
3282 case COMPLETION_ACLLIST:
3283 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
3284 cptr->xid, failed, rc);
3285 if (failed) {
3286 cptr->c.acl_result(rc, 0, 0, cptr->data);
3287 } else {
3288 struct GetACLResponse res;
3289 deserialize_GetACLResponse(ia, "reply", &res);
3290 cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
3291 deallocate_GetACLResponse(&res);
3292 }
3293 break;
3294 case COMPLETION_VOID:
3295 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
3296 cptr->xid, failed, rc);
3297 assert(cptr->c.void_result);
3298 cptr->c.void_result(rc, cptr->data);
3299 break;
3300 case COMPLETION_MULTI:
3301 LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
3302 cptr->xid, failed, rc);
3303 assert(cptr->c.void_result);
3304 if (failed) {
3305 cleanup_failed_multi(zh, xid, rc, cptr);
3306 } else {
3307 rc = deserialize_multi(zh, xid, cptr, ia);
3308 }
3309 cptr->c.void_result(rc, cptr->data);
3310 break;
3311 default:
3312 LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
3313 }
3314 }
3315
3316
3317 /* handles async completion (both single- and multithreaded) */
3318 void process_completions(zhandle_t *zh)
3319 {
3320 completion_list_t *cptr;
3321 while ((cptr = dequeue_completion(&zh->completions_to_process)) != 0) {
3322 struct ReplyHeader hdr;
3323 buffer_list_t *bptr = cptr->buffer;
3324 struct iarchive *ia = create_buffer_iarchive(bptr->buffer,
3325 bptr->len);
3326 deserialize_ReplyHeader(ia, "hdr", &hdr);
3327
3328 if (hdr.xid == WATCHER_EVENT_XID) {
3329 int type, state;
3330 struct WatcherEvent evt;
3331 deserialize_WatcherEvent(ia, "event", &evt);
3332 /* We are doing a notification, so there is no pending request */
3333 type = evt.type;
3334 state = evt.state;
3335 /* This is a notification so there aren't any pending requests */
3336 LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for node [%s], type = %d event=%s",
3337 (evt.path==NULL?"NULL":evt.path), cptr->c.type,
3338 watcherEvent2String(type));
3339 deliverWatchers(zh,type,state,evt.path, &cptr->c.watcher_result);
3340 deallocate_WatcherEvent(&evt);
3341 } else {
3342 deserialize_response(zh, cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
3343 }
3344 destroy_completion_entry(cptr);
3345 close_buffer_iarchive(&ia);
3346 }
3347 }
3348
3349 static void isSocketReadable(zhandle_t* zh)
3350 {
3351 #ifndef _WIN32
3352 struct pollfd fds;
3353 fds.fd = zh->fd->sock;
3354 fds.events = POLLIN;
3355 if (poll(&fds,1,0)<=0) {
3356 // socket not readable -- no more responses to process
3357 zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
3358 }
3359 #else
3360 fd_set rfds;
3361 struct timeval waittime = {0, 0};
3362 FD_ZERO(&rfds);
3363 FD_SET( zh->fd , &rfds);
3364 if (select(0, &rfds, NULL, NULL, &waittime) <= 0){
3365 // socket not readable -- no more responses to process
3366 zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
3367 }
3368 #endif
3369 else{
3370 get_system_time(&zh->socket_readable);
3371 }
3372 }
3373
3374 static void checkResponseLatency(zhandle_t* zh)
3375 {
3376 int delay;
3377 struct timeval now;
3378
3379 if(zh->socket_readable.tv_sec==0)
3380 return;
3381
3382 get_system_time(&now);
3383 delay=calculate_interval(&zh->socket_readable, &now);
3384 if(delay>20)
3385 LOG_DEBUG(LOGCALLBACK(zh), "The following server response has spent at least %dms sitting in the client socket recv buffer",delay);
3386
3387 zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
3388 }
3389
3390 int zookeeper_process(zhandle_t *zh, int events)
3391 {
3392 buffer_list_t *bptr;
3393 int rc;
3394
3395 if (zh==NULL)
3396 return ZBADARGUMENTS;
3397 if (is_unrecoverable(zh))
3398 return ZINVALIDSTATE;
3399 api_prolog(zh);
3400 IF_DEBUG(checkResponseLatency(zh));
3401 rc = check_events(zh, events);
3402 if (rc!=ZOK)
3403 return api_epilog(zh, rc);
3404
3405 IF_DEBUG(isSocketReadable(zh));
3406
3407 while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
3408 struct ReplyHeader hdr;
3409 struct iarchive *ia = create_buffer_iarchive(
3410 bptr->buffer, bptr->curr_offset);
3411 deserialize_ReplyHeader(ia, "hdr", &hdr);
3412
3413 if (hdr.xid == PING_XID) {
3414 // Ping replies can arrive out-of-order
3415 int elapsed = 0;
3416 struct timeval now;
3417 gettimeofday(&now, 0);
3418 elapsed = calculate_interval(&zh->last_ping, &now);
3419 LOG_DEBUG(LOGCALLBACK(zh), "Got ping response in %d ms", elapsed);
3420 free_buffer(bptr);
3421 } else if (hdr.xid == WATCHER_EVENT_XID) {
3422 struct WatcherEvent evt;
3423 int type = 0;
3424 char *path = NULL;
3425 completion_list_t *c = NULL;
3426
3427 LOG_DEBUG(LOGCALLBACK(zh), "Processing WATCHER_EVENT");
3428
3429 deserialize_WatcherEvent(ia, "event", &evt);
3430 type = evt.type;
3431 path = evt.path;
3432 /* We are doing a notification, so there is no pending request */
3433 c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
3434 c->buffer = bptr;
3435 lock_watchers(zh);
3436 c->c.watcher_result = collectWatchers(zh, type, path);
3437 unlock_watchers(zh);
3438
3439 // We cannot free until now, otherwise path will become invalid
3440 deallocate_WatcherEvent(&evt);
3441 queue_completion(&zh->completions_to_process, c, 0);
3442 } else if (hdr.xid == SET_WATCHES_XID) {
3443 LOG_DEBUG(LOGCALLBACK(zh), "Processing SET_WATCHES");
3444 free_buffer(bptr);
3445 } else if (hdr.xid == AUTH_XID){
3446 LOG_DEBUG(LOGCALLBACK(zh), "Processing AUTH_XID");
3447
3448 /* special handling for the AUTH response as it may come back
3449 * out-of-band */
3450 auth_completion_func(hdr.err,zh);
3451 free_buffer(bptr);
3452 /* authentication completion may change the connection state to
3453 * unrecoverable */
3454 if(is_unrecoverable(zh)){
3455 handle_error(zh, ZAUTHFAILED);
3456 close_buffer_iarchive(&ia);
3457 return api_epilog(zh, ZAUTHFAILED);
3458 }
3459 } else {
3460 int rc = hdr.err;
3461 /* Find the request corresponding to the response */
3462 completion_list_t *cptr = dequeue_completion(&zh->sent_requests);
3463
3464 /* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
3465 if (zh->close_requested == 1 && cptr == NULL) {
3466 LOG_DEBUG(LOGCALLBACK(zh), "Completion queue has been cleared by zookeeper_close()");
3467 close_buffer_iarchive(&ia);
3468 free_buffer(bptr);
3469 return api_epilog(zh,ZINVALIDSTATE);
3470 }
3471 assert(cptr);
3472 /* The requests are going to come back in order */
3473 if (cptr->xid != hdr.xid) {
3474 LOG_DEBUG(LOGCALLBACK(zh), "Processing unexpected or out-of-order response!");
3475
3476 // received unexpected (or out-of-order) response
3477 close_buffer_iarchive(&ia);
3478 free_buffer(bptr);
3479 // put the completion back on the queue (so it gets properly
3480 // signaled and deallocated) and disconnect from the server
3481 queue_completion(&zh->sent_requests,cptr,1);
3482 return api_epilog(zh,
3483 handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
3484 "unexpected server response: expected %#x, but received %#x",
3485 hdr.xid,cptr->xid));
3486 }
3487
3488 if (hdr.zxid > 0) {
3489 // Update last_zxid only when it is a request response
3490 zh->last_zxid = hdr.zxid;
3491 }
3492 lock_watchers(zh);
3493 activateWatcher(zh, cptr->watcher, rc);
3494 deactivateWatcher(zh, cptr->watcher_deregistration, rc);
3495 unlock_watchers(zh);
3496
3497 if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
3498 LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
3499 cptr->buffer = bptr;
3500 queue_completion(&zh->completions_to_process, cptr, 0);
3501 } else {
3502 #ifdef THREADED
3503 struct sync_completion
3504 *sc = (struct sync_completion*)cptr->data;
3505 sc->rc = rc;
3506
3507 process_sync_completion(zh, cptr, sc, ia);
3508
3509 notify_sync_completion(sc);
3510 free_buffer(bptr);
3511 zh->outstanding_sync--;
3512 destroy_completion_entry(cptr);
3513 #else
3514 abort_singlethreaded(zh);
3515 #endif
3516 }
3517 }
3518
3519 close_buffer_iarchive(&ia);
3520
3521 }
3522 if (process_async(zh->outstanding_sync)) {
3523 process_completions(zh);
3524 }
3525
3526 return api_epilog(zh, ZOK);
3527 }
3528
3529 int zoo_state(zhandle_t *zh)
3530 {
3531 if(zh!=0)
3532 return zh->state;
3533 return 0;
3534 }
3535
3536 static watcher_registration_t* create_watcher_registration(const char* path,
3537 result_checker_fn checker,watcher_fn watcher,void* ctx){
3538 watcher_registration_t* wo;
3539 if(watcher==0)
3540 return 0;
3541 wo=calloc(1,sizeof(watcher_registration_t));
3542 wo->path=strdup(path);
3543 wo->watcher=watcher;
3544 wo->context=ctx;
3545 wo->checker=checker;
3546 return wo;
3547 }
3548
3549 static watcher_deregistration_t* create_watcher_deregistration(const char* path,
3550 watcher_fn watcher, void *watcherCtx, ZooWatcherType wtype) {
3551 watcher_deregistration_t *wdo;
3552
3553 wdo = calloc(1, sizeof(watcher_deregistration_t));
3554 if (!wdo) {
3555 return NULL;
3556 }
3557 wdo->path = strdup(path);
3558 wdo->watcher = watcher;
3559 wdo->context = watcherCtx;
3560 wdo->type = wtype;
3561 return wdo;
3562 }
3563
3564 static void destroy_watcher_registration(watcher_registration_t* wo){
3565 if(wo!=0){
3566 free((void*)wo->path);
3567 free(wo);
3568 }
3569 }
3570
3571 static void destroy_watcher_deregistration(watcher_deregistration_t *wdo) {
3572 if (wdo) {
3573 free((void *)wdo->path);
3574 free(wdo);
3575 }
3576 }
3577
3578 static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
3579 const void *dc, const void *data,watcher_registration_t* wo, completion_head_t *clist)
3580 {
3581 return do_create_completion_entry(zh, xid, completion_type, dc, data, wo,
3582 clist, NULL);
3583 }
3584
3585 static completion_list_t* create_completion_entry_deregistration(zhandle_t *zh,
3586 int xid, int completion_type, const void *dc, const void *data,
3587 watcher_deregistration_t* wdo, completion_head_t *clist)
3588 {
3589 return do_create_completion_entry(zh, xid, completion_type, dc, data, NULL,
3590 clist, wdo);
3591 }
3592
3593 static completion_list_t* do_create_completion_entry(zhandle_t *zh, int xid,
3594 int completion_type, const void *dc, const void *data,
3595 watcher_registration_t* wo, completion_head_t *clist,
3596 watcher_deregistration_t* wdo)
3597 {
3598 completion_list_t *c = calloc(1, sizeof(completion_list_t));
3599 if (!c) {
3600 LOG_ERROR(LOGCALLBACK(zh), "out of memory");
3601 return 0;
3602 }
3603 c->c.type = completion_type;
3604 c->data = data;
3605 switch(c->c.type) {
3606 case COMPLETION_VOID:
3607 c->c.void_result = (void_completion_t)dc;
3608 break;
3609 case COMPLETION_STRING:
3610 c->c.string_result = (string_completion_t)dc;
3611 break;
3612 case COMPLETION_DATA:
3613 c->c.data_result = (data_completion_t)dc;
3614 break;
3615 case COMPLETION_STAT:
3616 c->c.stat_result = (stat_completion_t)dc;
3617 break;
3618 case COMPLETION_STRINGLIST:
3619 c->c.strings_result = (strings_completion_t)dc;
3620 break;
3621 case COMPLETION_STRINGLIST_STAT:
3622 c->c.strings_stat_result = (strings_stat_completion_t)dc;
3623 break;
3624 case COMPLETION_STRING_STAT:
3625 c->c.string_stat_result = (string_stat_completion_t)dc;
3626 case COMPLETION_ACLLIST:
3627 c->c.acl_result = (acl_completion_t)dc;
3628 break;
3629 case COMPLETION_MULTI:
3630 assert(clist);
3631 c->c.void_result = (void_completion_t)dc;
3632 c->c.clist = *clist;
3633 break;
3634 }
3635 c->xid = xid;
3636 c->watcher = wo;
3637 c->watcher_deregistration = wdo;
3638
3639 return c;
3640 }
3641
3642 static void destroy_completion_entry(completion_list_t* c){
3643 if(c!=0){
3644 destroy_watcher_registration(c->watcher);
3645 destroy_watcher_deregistration(c->watcher_deregistration);
3646 if(c->buffer!=0)
3647 free_buffer(c->buffer);
3648 free(c);
3649 }
3650 }
3651
3652 static void queue_completion_nolock(completion_head_t *list,
3653 completion_list_t *c,
3654 int add_to_front)
3655 {
3656 c->next = 0;
3657 /* appending a new entry to the back of the list */
3658 if (list->last) {
3659 assert(list->head);
3660 // List is not empty
3661 if (!add_to_front) {
3662 list->last->next = c;
3663 list->last = c;
3664 } else {
3665 c->next = list->head;
3666 list->head = c;
3667 }
3668 } else {
3669 // List is empty
3670 assert(!list->head);
3671 list->head = c;
3672 list->last = c;
3673 }
3674 }
3675
3676 static void queue_completion(completion_head_t *list, completion_list_t *c,
3677 int add_to_front)
3678 {
3679
3680 lock_completion_list(list);
3681 queue_completion_nolock(list, c, add_to_front);
3682 unlock_completion_list(list);
3683 }
3684
3685 static int add_completion(zhandle_t *zh, int xid, int completion_type,
3686 const void *dc, const void *data, int add_to_front,
3687 watcher_registration_t* wo, completion_head_t *clist)
3688 {
3689 completion_list_t *c =create_completion_entry(zh, xid, completion_type, dc,
3690 data, wo, clist);
3691 return do_add_completion(zh, dc, c, add_to_front);
3692 }
3693
3694 static int add_completion_deregistration(zhandle_t *zh, int xid,
3695 int completion_type, const void *dc, const void *data, int add_to_front,
3696 watcher_deregistration_t* wdo, completion_head_t *clist)
3697 {
3698 completion_list_t *c = create_completion_entry_deregistration(zh, xid,
3699 completion_type, dc, data, wdo, clist);
3700 return do_add_completion(zh, dc, c, add_to_front);
3701 }
3702
3703 static int do_add_completion(zhandle_t *zh, const void *dc,
3704 completion_list_t *c, int add_to_front)
3705 {
3706 int rc = 0;
3707 if (!c)
3708 return ZSYSTEMERROR;
3709 lock_completion_list(&zh->sent_requests);
3710 if (zh->close_requested != 1) {
3711 queue_completion_nolock(&zh->sent_requests, c, add_to_front);
3712 if (dc == SYNCHRONOUS_MARKER) {
3713 zh->outstanding_sync++;
3714 }
3715 rc = ZOK;
3716 } else {
3717 free(c);
3718 rc = ZINVALIDSTATE;
3719 }
3720 unlock_completion_list(&zh->sent_requests);
3721 return rc;
3722 }
3723
3724 static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc,
3725 const void *data,watcher_registration_t* wo)
3726 {
3727 return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0, wo, 0);
3728 }
3729
3730 static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc,
3731 const void *data,watcher_registration_t* wo)
3732 {
3733 return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0, wo, 0);
3734 }
3735
3736 static int add_strings_completion(zhandle_t *zh, int xid,
3737 strings_completion_t dc, const void *data,watcher_registration_t* wo)
3738 {
3739 return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0, wo, 0);
3740 }
3741
3742 static int add_strings_stat_completion(zhandle_t *zh, int xid,
3743 strings_stat_completion_t dc, const void *data,watcher_registration_t* wo)
3744 {
3745 return add_completion(zh, xid, COMPLETION_STRINGLIST_STAT, dc, data, 0, wo, 0);
3746 }
3747
3748 static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc,
3749 const void *data)
3750 {
3751 return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0, 0, 0);
3752 }
3753
3754 static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
3755 const void *data)
3756 {
3757 return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0, 0, 0);
3758 }
3759
3760 static int add_string_completion(zhandle_t *zh, int xid,
3761 string_completion_t dc, const void *data)
3762 {
3763 return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0, 0, 0);
3764 }
3765
3766 static int add_string_stat_completion(zhandle_t *zh, int xid,
3767 string_stat_completion_t dc, const void *data)
3768 {
3769 return add_completion(zh, xid, COMPLETION_STRING_STAT, dc, data, 0, 0, 0);
3770 }
3771
3772 static int add_multi_completion(zhandle_t *zh, int xid, void_completion_t dc,
3773 const void *data, completion_head_t *clist)
3774 {
3775 return add_completion(zh, xid, COMPLETION_MULTI, dc, data, 0,0, clist);
3776 }
3777
3778 /**
3779 * After sending the close request, we are waiting for a given millisecs for
3780 * getting the answer and/or for the socket to be closed by the server.
3781 *
3782 * This function should not be called while we still want to process
3783 * any response from the server. It must be called after adaptor_finish called,
3784 * in order not to mess with the I/O receiver thread in multi-threaded mode.
3785 */
3786 int wait_for_session_to_be_closed(zhandle_t *zh, int timeout_ms)
3787 {
3788 int ret = 0;
3789 #ifndef WIN32
3790 struct pollfd fd_s[1];
3791 #else
3792 fd_set rfds;
3793 struct timeval waittime = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
3794 #endif
3795
3796 if (zh == NULL) {
3797 return ZBADARGUMENTS;
3798 }
3799
3800 #ifndef WIN32
3801 fd_s[0].fd = zh->fd->sock;
3802 fd_s[0].events = POLLIN;
3803 ret = poll(fd_s, 1, timeout_ms);
3804 #else
3805 FD_ZERO(&rfds);
3806 FD_SET(zh->fd->sock , &rfds);
3807 ret = select(zh->fd->sock + 1, &rfds, NULL, NULL, &waittime);
3808 #endif
3809
3810 if (ret == 0){
3811 LOG_WARN(LOGCALLBACK(zh), "Timed out (%dms) during waiting for server's reply after sending a close request, sessionId=%#llx\n",
3812 timeout_ms, zh->client_id.client_id);
3813 } else if (ret < 0) {
3814 LOG_WARN(LOGCALLBACK(zh), "System error (%d) happened while waiting for server's reply, sessionId=%#llx\n",
3815 ret, zh->client_id.client_id);
3816 }
3817
3818 return ZOK;
3819 }
3820
3821 int zookeeper_close(zhandle_t *zh)
3822 {
3823 int rc=ZOK;
3824 if (zh==0)
3825 return ZBADARGUMENTS;
3826
3827 zh->close_requested=1;
3828 if (inc_ref_counter(zh,1)>1) {
3829 /* We have incremented the ref counter to prevent the
3830 * completions from calling zookeeper_close before we have
3831 * completed the adaptor_finish call below. */
3832
3833 /* Signal any syncronous completions before joining the threads */
3834 enter_critical(zh);
3835 free_completions(zh,1,ZCLOSING);
3836 leave_critical(zh);
3837
3838 adaptor_finish(zh);
3839 /* Now we can allow the handle to be cleaned up, if the completion
3840 * threads finished during the adaptor_finish call. */
3841 api_epilog(zh, 0);
3842 return ZOK;
3843 }
3844 /* No need to decrement the counter since we're just going to
3845 * destroy the handle later. */
3846 if (is_connected(zh)) {
3847 struct oarchive *oa;
3848 struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
3849 LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to %s\n",
3850 zh->client_id.client_id, zoo_get_current_server(zh));
3851 oa = create_buffer_oarchive();
3852 rc = serialize_RequestHeader(oa, "header", &h);
3853 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa));
3854 /* We queued the buffer, so don't free it */
3855 close_buffer_oarchive(&oa, 0);
3856 if (rc < 0) {
3857 LOG_DEBUG(LOGCALLBACK(zh), "Error during closing zookeeper session, sessionId=%#llx to %s (error: %d)\n",
3858 zh->client_id.client_id, zoo_get_current_server(zh), rc);
3859 rc = ZMARSHALLINGERROR;
3860 } else {
3861 /* make sure the close request is sent; we set timeout to an arbitrary
3862 * (but reasonable) number of milliseconds since we want the call to block*/
3863 rc = adaptor_send_queue(zh, 3000);
3864
3865 /* give some time to the server to process the session close request properly */
3866 rc = rc < 0 ? rc : wait_for_session_to_be_closed(zh, 1500);
3867 }
3868 } else {
3869 rc = ZOK;
3870 }
3871
3872 LOG_INFO(LOGCALLBACK(zh), "Freeing zookeeper resources for sessionId=%#llx\n", zh->client_id.client_id);
3873 destroy(zh);
3874 adaptor_destroy(zh);
3875 free(zh->fd);
3876 free(zh);
3877 #ifdef _WIN32
3878 Win32WSACleanup();
3879 #endif
3880 return rc;
3881 }
3882
3883 static int isValidPath(const char* path, const int mode) {
3884 int len = 0;
3885 char lastc = '/';
3886 char c;
3887 int i = 0;
3888
3889 if (path == 0)
3890 return 0;
3891 len = strlen(path);
3892 if (len == 0)
3893 return 0;
3894 if (path[0] != '/')
3895 return 0;
3896 if (len == 1) // done checking - it's the root
3897 return 1;
3898 if (path[len - 1] == '/' && !ZOOKEEPER_IS_SEQUENCE(mode))
3899 return 0;
3900
3901 i = 1;
3902 for (; i < len; lastc = path[i], i++) {
3903 c = path[i];
3904
3905 if (c == 0) {
3906 return 0;
3907 } else if (c == '/' && lastc == '/') {
3908 return 0;
3909 } else if (c == '.' && lastc == '.') {
3910 if (path[i-2] == '/' && (((i + 1 == len) && !ZOOKEEPER_IS_SEQUENCE(mode))
3911 || path[i+1] == '/')) {
3912 return 0;
3913 }
3914 } else if (c == '.') {
3915 if ((path[i-1] == '/') && (((i + 1 == len) && !ZOOKEEPER_IS_SEQUENCE(mode))
3916 || path[i+1] == '/')) {
3917 return 0;
3918 }
3919 } else if (c > 0x00 && c < 0x1f) {
3920 return 0;
3921 }
3922 }
3923
3924 return 1;
3925 }
3926
3927 /*---------------------------------------------------------------------------*
3928 * REQUEST INIT HELPERS
3929 *---------------------------------------------------------------------------*/
3930 /* Common Request init helper functions to reduce code duplication */
3931 static int Request_path_init(zhandle_t *zh, int mode,
3932 char **path_out, const char *path)
3933 {
3934 assert(path_out);
3935
3936 *path_out = prepend_string(zh, path);
3937 if (zh == NULL || !isValidPath(*path_out, mode)) {
3938 free_duplicate_path(*path_out, path);
3939 return ZBADARGUMENTS;
3940 }
3941 if (is_unrecoverable(zh)) {
3942 free_duplicate_path(*path_out, path);
3943 return ZINVALIDSTATE;
3944 }
3945
3946 return ZOK;
3947 }
3948
3949 static int Request_path_watch_init(zhandle_t *zh, int mode,
3950 char **path_out, const char *path,
3951 int32_t *watch_out, uint32_t watch)
3952 {
3953 int rc = Request_path_init(zh, mode, path_out, path);
3954 if (rc != ZOK) {
3955 return rc;
3956 }
3957 *watch_out = watch;
3958 return ZOK;
3959 }
3960
3961 /*---------------------------------------------------------------------------*
3962 * ASYNC API
3963 *---------------------------------------------------------------------------*/
3964 int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
3965 const void *data)
3966 {
3967 return zoo_awget(zh,path,watch?zh->watcher:0,zh->context,dc,data);
3968 }
3969
3970 int zoo_awget(zhandle_t *zh, const char *path,
3971 watcher_fn watcher, void* watcherCtx,
3972 data_completion_t dc, const void *data)
3973 {
3974 struct oarchive *oa;
3975 char *server_path = prepend_string(zh, path);
3976 struct RequestHeader h = {get_xid(), ZOO_GETDATA_OP};
3977 struct GetDataRequest req = { (char*)server_path, watcher!=0 };
3978 int rc;
3979
3980 if (zh==0 || !isValidPath(server_path, 0)) {
3981 free_duplicate_path(server_path, path);
3982 return ZBADARGUMENTS;
3983 }
3984 if (is_unrecoverable(zh)) {
3985 free_duplicate_path(server_path, path);
3986 return ZINVALIDSTATE;
3987 }
3988 oa=create_buffer_oarchive();
3989 rc = serialize_RequestHeader(oa, "header", &h);
3990 rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
3991 enter_critical(zh);
3992 rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
3993 create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
3994 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
3995 get_buffer_len(oa));
3996 leave_critical(zh);
3997 free_duplicate_path(server_path, path);
3998 /* We queued the buffer, so don't free it */
3999 close_buffer_oarchive(&oa, 0);
4000
4001 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4002 zoo_get_current_server(zh));
4003 /* make a best (non-blocking) effort to send the requests asap */
4004 adaptor_send_queue(zh, 0);
4005 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4006 }
4007
4008 int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc,
4009 const void *data)
4010 {
4011 return zoo_awgetconfig(zh,watch?zh->watcher:0,zh->context,dc,data);
4012 }
4013
4014 int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
4015 data_completion_t dc, const void *data)
4016 {
4017 struct oarchive *oa;
4018 char *path = ZOO_CONFIG_NODE;
4019 char *server_path = ZOO_CONFIG_NODE;
4020 struct RequestHeader h = { get_xid(), ZOO_GETDATA_OP };
4021 struct GetDataRequest req = { (char*)server_path, watcher!=0 };
4022 int rc;
4023
4024 if (zh==0 || !isValidPath(server_path, 0)) {
4025 free_duplicate_path(server_path, path);
4026 return ZBADARGUMENTS;
4027 }
4028 if (is_unrecoverable(zh)) {
4029 free_duplicate_path(server_path, path);
4030 return ZINVALIDSTATE;
4031 }
4032 oa=create_buffer_oarchive();
4033 rc = serialize_RequestHeader(oa, "header", &h);
4034 rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
4035 enter_critical(zh);
4036 rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
4037 create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
4038 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4039 get_buffer_len(oa));
4040 leave_critical(zh);
4041 free_duplicate_path(server_path, path);
4042 /* We queued the buffer, so don't free it */
4043 close_buffer_oarchive(&oa, 0);
4044
4045 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4046 zoo_get_current_server(zh));
4047 /* make a best (non-blocking) effort to send the requests asap */
4048 adaptor_send_queue(zh, 0);
4049 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4050 }
4051
4052 int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
4053 const char *members, int64_t version, data_completion_t dc, const void *data)
4054 {
4055 struct oarchive *oa;
4056 struct RequestHeader h = { get_xid(), ZOO_RECONFIG_OP };
4057 struct ReconfigRequest req;
4058 int rc = 0;
4059
4060 if (zh==0) {
4061 return ZBADARGUMENTS;
4062 }
4063 if (is_unrecoverable(zh)) {
4064 return ZINVALIDSTATE;
4065 }
4066
4067 oa=create_buffer_oarchive();
4068 req.joiningServers = (char *)joining;
4069 req.leavingServers = (char *)leaving;
4070 req.newMembers = (char *)members;
4071 req.curConfigId = version;
4072 rc = serialize_RequestHeader(oa, "header", &h);
4073 rc = rc < 0 ? rc : serialize_ReconfigRequest(oa, "req", &req);
4074 enter_critical(zh);
4075 rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data, NULL);
4076 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4077 get_buffer_len(oa));
4078 leave_critical(zh);
4079 /* We queued the buffer, so don't free it */
4080 close_buffer_oarchive(&oa, 0);
4081
4082 LOG_DEBUG(LOGCALLBACK(zh), "Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh));
4083 /* make a best (non-blocking) effort to send the requests asap */
4084 adaptor_send_queue(zh, 0);
4085
4086 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4087 }
4088
4089 static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
4090 const char *path, const char *buffer, int buflen, int version)
4091 {
4092 int rc;
4093 assert(req);
4094 rc = Request_path_init(zh, 0, &req->path, path);
4095 if (rc != ZOK) {
4096 return rc;
4097 }
4098 req->data.buff = (char*)buffer;
4099 req->data.len = buflen;
4100 req->version = version;
4101
4102 return ZOK;
4103 }
4104
4105 int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,
4106 int version, stat_completion_t dc, const void *data)
4107 {
4108 struct oarchive *oa;
4109 struct RequestHeader h = {get_xid(), ZOO_SETDATA_OP};
4110 struct SetDataRequest req;
4111 int rc = SetDataRequest_init(zh, &req, path, buffer, buflen, version);
4112 if (rc != ZOK) {
4113 return rc;
4114 }
4115 oa = create_buffer_oarchive();
4116 rc = serialize_RequestHeader(oa, "header", &h);
4117 rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
4118 enter_critical(zh);
4119 rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data,0);
4120 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4121 get_buffer_len(oa));
4122 leave_critical(zh);
4123 free_duplicate_path(req.path, path);
4124 /* We queued the buffer, so don't free it */
4125 close_buffer_oarchive(&oa, 0);
4126
4127 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4128 zoo_get_current_server(zh));
4129 /* make a best (non-blocking) effort to send the requests asap */
4130 adaptor_send_queue(zh, 0);
4131 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4132 }
4133
4134 static int CreateRequest_init(zhandle_t *zh, struct CreateRequest *req,
4135 const char *path, const char *value,
4136 int valuelen, const struct ACL_vector *acl_entries, int mode)
4137 {
4138 int rc;
4139 assert(req);
4140 rc = Request_path_init(zh, mode, &req->path, path);
4141 assert(req);
4142 if (rc != ZOK) {
4143 return rc;
4144 }
4145 req->flags = mode;
4146 req->data.buff = (char*)value;
4147 req->data.len = valuelen;
4148 if (acl_entries == 0) {
4149 req->acl.count = 0;
4150 req->acl.data = 0;
4151 } else {
4152 req->acl = *acl_entries;
4153 }
4154
4155 return ZOK;
4156 }
4157
4158 static int CreateTTLRequest_init(zhandle_t *zh, struct CreateTTLRequest *req,
4159 const char *path, const char *value,
4160 int valuelen, const struct ACL_vector *acl_entries, int mode, int64_t ttl)
4161 {
4162 int rc;
4163 assert(req);
4164 rc = Request_path_init(zh, mode, &req->path, path);
4165 assert(req);
4166 if (rc != ZOK) {
4167 return rc;
4168 }
4169 req->flags = mode;
4170 req->data.buff = (char*)value;
4171 req->data.len = valuelen;
4172 if (acl_entries == 0) {
4173 req->acl.count = 0;
4174 req->acl.data = 0;
4175 } else {
4176 req->acl = *acl_entries;
4177 }
4178 req->ttl = ttl;
4179
4180 return ZOK;
4181 }
4182
4183 static int get_create_op_type(int mode, int default_op) {
4184 if (mode == ZOO_CONTAINER) {
4185 return ZOO_CREATE_CONTAINER_OP;
4186 } else if (ZOOKEEPER_IS_TTL(mode)) {
4187 return ZOO_CREATE_TTL_OP;
4188 } else {
4189 return default_op;
4190 }
4191 }
4192
4193 int zoo_acreate(zhandle_t *zh, const char *path, const char *value,
4194 int valuelen, const struct ACL_vector *acl_entries, int mode,
4195 string_completion_t completion, const void *data)
4196 {
4197 return zoo_acreate_ttl(zh, path, value, valuelen, acl_entries, mode, -1, completion, data);
4198 }
4199
4200 int zoo_acreate_ttl(zhandle_t *zh, const char *path, const char *value,
4201 int valuelen, const struct ACL_vector *acl_entries, int mode, int64_t ttl,
4202 string_completion_t completion, const void *data)
4203 {
4204 struct oarchive *oa;
4205 struct RequestHeader h = {get_xid(), get_create_op_type(mode, ZOO_CREATE_OP)};
4206 int rc;
4207 char *req_path;
4208
4209 if (ZOOKEEPER_IS_TTL(mode)) {
4210 struct CreateTTLRequest req;
4211
4212 if (ttl <= 0 || ttl > ZOO_MAX_TTL) {
4213 return ZBADARGUMENTS;
4214 }
4215
4216 rc = CreateTTLRequest_init(zh, &req,
4217 path, value, valuelen, acl_entries, mode, ttl);
4218 if (rc != ZOK) {
4219 return rc;
4220 }
4221 oa = create_buffer_oarchive();
4222 rc = serialize_RequestHeader(oa, "header", &h);
4223 rc = rc < 0 ? rc : serialize_CreateTTLRequest(oa, "req", &req);
4224
4225 req_path = req.path;
4226 } else {
4227 struct CreateRequest req;
4228
4229 if (ttl >= 0) {
4230 return ZBADARGUMENTS;
4231 }
4232
4233 rc = CreateRequest_init(zh, &req,
4234 path, value, valuelen, acl_entries, mode);
4235 if (rc != ZOK) {
4236 return rc;
4237 }
4238 oa = create_buffer_oarchive();
4239 rc = serialize_RequestHeader(oa, "header", &h);
4240 rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
4241
4242 req_path = req.path;
4243 }
4244
4245 enter_critical(zh);
4246 rc = rc < 0 ? rc : add_string_completion(zh, h.xid, completion, data);
4247 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4248 get_buffer_len(oa));
4249 leave_critical(zh);
4250 free_duplicate_path(req_path, path);
4251 /* We queued the buffer, so don't free it */
4252 close_buffer_oarchive(&oa, 0);
4253
4254 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4255 zoo_get_current_server(zh));
4256 /* make a best (non-blocking) effort to send the requests asap */
4257 adaptor_send_queue(zh, 0);
4258 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4259 }
4260
4261 int zoo_acreate2(zhandle_t *zh, const char *path, const char *value,
4262 int valuelen, const struct ACL_vector *acl_entries, int mode,
4263 string_stat_completion_t completion, const void *data)
4264 {
4265 return zoo_acreate2_ttl(zh, path, value, valuelen, acl_entries, mode, -1, completion, data);
4266 }
4267
4268 int zoo_acreate2_ttl(zhandle_t *zh, const char *path, const char *value,
4269 int valuelen, const struct ACL_vector *acl_entries, int mode, int64_t ttl,
4270 string_stat_completion_t completion, const void *data)
4271 {
4272 struct oarchive *oa;
4273 struct RequestHeader h = { get_xid(), get_create_op_type(mode, ZOO_CREATE2_OP) };
4274 int rc;
4275 char *req_path;
4276
4277 if (ZOOKEEPER_IS_TTL(mode)) {
4278 struct CreateTTLRequest req;
4279
4280 if (ttl <= 0 || ttl > ZOO_MAX_TTL) {
4281 return ZBADARGUMENTS;
4282 }
4283
4284 rc = CreateTTLRequest_init(zh, &req,
4285 path, value, valuelen, acl_entries, mode, ttl);
4286 if (rc != ZOK) {
4287 return rc;
4288 }
4289 oa = create_buffer_oarchive();
4290 rc = serialize_RequestHeader(oa, "header", &h);
4291 rc = rc < 0 ? rc : serialize_CreateTTLRequest(oa, "req", &req);
4292
4293 req_path = req.path;
4294 } else {
4295 struct CreateRequest req;
4296
4297 if (ttl >= 0) {
4298 return ZBADARGUMENTS;
4299 }
4300
4301 rc = CreateRequest_init(zh, &req, path, value, valuelen, acl_entries, mode);
4302 if (rc != ZOK) {
4303 return rc;
4304 }
4305 oa = create_buffer_oarchive();
4306 rc = serialize_RequestHeader(oa, "header", &h);
4307 rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
4308
4309 req_path = req.path;
4310 }
4311
4312 enter_critical(zh);
4313 rc = rc < 0 ? rc : add_string_stat_completion(zh, h.xid, completion, data);
4314 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4315 get_buffer_len(oa));
4316 leave_critical(zh);
4317 free_duplicate_path(req_path, path);
4318 /* We queued the buffer, so don't free it */
4319 close_buffer_oarchive(&oa, 0);
4320
4321 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4322 zoo_get_current_server(zh));
4323 /* make a best (non-blocking) effort to send the requests asap */
4324 adaptor_send_queue(zh, 0);
4325 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4326 }
4327
4328 int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
4329 const char *path, int version)
4330 {
4331 int rc = Request_path_init(zh, 0, &req->path, path);
4332 if (rc != ZOK) {
4333 return rc;
4334 }
4335 req->version = version;
4336 return ZOK;
4337 }
4338
4339 int zoo_adelete(zhandle_t *zh, const char *path, int version,
4340 void_completion_t completion, const void *data)
4341 {
4342 struct oarchive *oa;
4343 struct RequestHeader h = {get_xid(), ZOO_DELETE_OP};
4344 struct DeleteRequest req;
4345 int rc = DeleteRequest_init(zh, &req, path, version);
4346 if (rc != ZOK) {
4347 return rc;
4348 }
4349 oa = create_buffer_oarchive();
4350 rc = serialize_RequestHeader(oa, "header", &h);
4351 rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
4352 enter_critical(zh);
4353 rc = rc < 0 ? rc : add_void_completion(zh, h.xid, completion, data);
4354 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4355 get_buffer_len(oa));
4356 leave_critical(zh);
4357 free_duplicate_path(req.path, path);
4358 /* We queued the buffer, so don't free it */
4359 close_buffer_oarchive(&oa, 0);
4360
4361 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4362 zoo_get_current_server(zh));
4363 /* make a best (non-blocking) effort to send the requests asap */
4364 adaptor_send_queue(zh, 0);
4365 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4366 }
4367
4368 int zoo_aexists(zhandle_t *zh, const char *path, int watch,
4369 stat_completion_t sc, const void *data)
4370 {
4371 return zoo_awexists(zh,path,watch?zh->watcher:0,zh->context,sc,data);
4372 }
4373
4374 int zoo_awexists(zhandle_t *zh, const char *path,
4375 watcher_fn watcher, void* watcherCtx,
4376 stat_completion_t completion, const void *data)
4377 {
4378 struct oarchive *oa;
4379 struct RequestHeader h = {get_xid(), ZOO_EXISTS_OP};
4380 struct ExistsRequest req;
4381 int rc = Request_path_watch_init(zh, 0, &req.path, path,
4382 &req.watch, watcher != NULL);
4383 if (rc != ZOK) {
4384 return rc;
4385 }
4386 oa = create_buffer_oarchive();
4387 rc = serialize_RequestHeader(oa, "header", &h);
4388 rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req);
4389 enter_critical(zh);
4390 rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data,
4391 create_watcher_registration(req.path,exists_result_checker,
4392 watcher,watcherCtx));
4393 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4394 get_buffer_len(oa));
4395 leave_critical(zh);
4396 free_duplicate_path(req.path, path);
4397 /* We queued the buffer, so don't free it */
4398 close_buffer_oarchive(&oa, 0);
4399
4400 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4401 zoo_get_current_server(zh));
4402 /* make a best (non-blocking) effort to send the requests asap */
4403 adaptor_send_queue(zh, 0);
4404 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4405 }
4406
4407 static int zoo_awget_children_(zhandle_t *zh, const char *path,
4408 watcher_fn watcher, void* watcherCtx,
4409 strings_completion_t sc,
4410 const void *data)
4411 {
4412 struct oarchive *oa;
4413 struct RequestHeader h = {get_xid(), ZOO_GETCHILDREN_OP};
4414 struct GetChildrenRequest req ;
4415 int rc = Request_path_watch_init(zh, 0, &req.path, path,
4416 &req.watch, watcher != NULL);
4417 if (rc != ZOK) {
4418 return rc;
4419 }
4420 oa = create_buffer_oarchive();
4421 rc = serialize_RequestHeader(oa, "header", &h);
4422 rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req);
4423 enter_critical(zh);
4424 rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, sc, data,
4425 create_watcher_registration(req.path,child_result_checker,watcher,watcherCtx));
4426 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4427 get_buffer_len(oa));
4428 leave_critical(zh);
4429 free_duplicate_path(req.path, path);
4430 /* We queued the buffer, so don't free it */
4431 close_buffer_oarchive(&oa, 0);
4432
4433 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4434 zoo_get_current_server(zh));
4435 /* make a best (non-blocking) effort to send the requests asap */
4436 adaptor_send_queue(zh, 0);
4437 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4438 }
4439
4440 int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
4441 strings_completion_t dc, const void *data)
4442 {
4443 return zoo_awget_children_(zh,path,watch?zh->watcher:0,zh->context,dc,data);
4444 }
4445
4446 int zoo_awget_children(zhandle_t *zh, const char *path,
4447 watcher_fn watcher, void* watcherCtx,
4448 strings_completion_t dc,
4449 const void *data)
4450 {
4451 return zoo_awget_children_(zh,path,watcher,watcherCtx,dc,data);
4452 }
4453
4454 static int zoo_awget_children2_(zhandle_t *zh, const char *path,
4455 watcher_fn watcher, void* watcherCtx,
4456 strings_stat_completion_t ssc,
4457 const void *data)
4458 {
4459 /* invariant: (sc == NULL) != (sc == NULL) */
4460 struct oarchive *oa;
4461 struct RequestHeader h = {get_xid(), ZOO_GETCHILDREN2_OP};
4462 struct GetChildren2Request req ;
4463 int rc = Request_path_watch_init(zh, 0, &req.path, path,
4464 &req.watch, watcher != NULL);
4465 if (rc != ZOK) {
4466 return rc;
4467 }
4468 oa = create_buffer_oarchive();
4469 rc = serialize_RequestHeader(oa, "header", &h);
4470 rc = rc < 0 ? rc : serialize_GetChildren2Request(oa, "req", &req);
4471 enter_critical(zh);
4472 rc = rc < 0 ? rc : add_strings_stat_completion(zh, h.xid, ssc, data,
4473 create_watcher_registration(req.path,child_result_checker,watcher,watcherCtx));
4474 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4475 get_buffer_len(oa));
4476 leave_critical(zh);
4477 free_duplicate_path(req.path, path);
4478 /* We queued the buffer, so don't free it */
4479 close_buffer_oarchive(&oa, 0);
4480
4481 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4482 zoo_get_current_server(zh));
4483 /* make a best (non-blocking) effort to send the requests asap */
4484 adaptor_send_queue(zh, 0);
4485 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4486 }
4487
4488 int zoo_aget_children2(zhandle_t *zh, const char *path, int watch,
4489 strings_stat_completion_t dc, const void *data)
4490 {
4491 return zoo_awget_children2_(zh,path,watch?zh->watcher:0,zh->context,dc,data);
4492 }
4493
4494 int zoo_awget_children2(zhandle_t *zh, const char *path,
4495 watcher_fn watcher, void* watcherCtx,
4496 strings_stat_completion_t dc,
4497 const void *data)
4498 {
4499 return zoo_awget_children2_(zh,path,watcher,watcherCtx,dc,data);
4500 }
4501
4502 int zoo_async(zhandle_t *zh, const char *path,
4503 string_completion_t completion, const void *data)
4504 {
4505 struct oarchive *oa;
4506 struct RequestHeader h = {get_xid(), ZOO_SYNC_OP};
4507 struct SyncRequest req;
4508 int rc = Request_path_init(zh, 0, &req.path, path);
4509 if (rc != ZOK) {
4510 return rc;
4511 }
4512 oa = create_buffer_oarchive();
4513 rc = serialize_RequestHeader(oa, "header", &h);
4514 rc = rc < 0 ? rc : serialize_SyncRequest(oa, "req", &req);
4515 enter_critical(zh);
4516 rc = rc < 0 ? rc : add_string_completion(zh, h.xid, completion, data);
4517 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4518 get_buffer_len(oa));
4519 leave_critical(zh);
4520 free_duplicate_path(req.path, path);
4521 /* We queued the buffer, so don't free it */
4522 close_buffer_oarchive(&oa, 0);
4523
4524 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4525 zoo_get_current_server(zh));
4526 /* make a best (non-blocking) effort to send the requests asap */
4527 adaptor_send_queue(zh, 0);
4528 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4529 }
4530
4531
4532 int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,
4533 const void *data)
4534 {
4535 struct oarchive *oa;
4536 struct RequestHeader h = {get_xid(), ZOO_GETACL_OP};
4537 struct GetACLRequest req;
4538 int rc = Request_path_init(zh, 0, &req.path, path) ;
4539 if (rc != ZOK) {
4540 return rc;
4541 }
4542 oa = create_buffer_oarchive();
4543 rc = serialize_RequestHeader(oa, "header", &h);
4544 rc = rc < 0 ? rc : serialize_GetACLRequest(oa, "req", &req);
4545 enter_critical(zh);
4546 rc = rc < 0 ? rc : add_acl_completion(zh, h.xid, completion, data);
4547 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4548 get_buffer_len(oa));
4549 leave_critical(zh);
4550 free_duplicate_path(req.path, path);
4551 /* We queued the buffer, so don't free it */
4552 close_buffer_oarchive(&oa, 0);
4553
4554 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4555 zoo_get_current_server(zh));
4556 /* make a best (non-blocking) effort to send the requests asap */
4557 adaptor_send_queue(zh, 0);
4558 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4559 }
4560
4561 int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
4562 struct ACL_vector *acl, void_completion_t completion, const void *data)
4563 {
4564 struct oarchive *oa;
4565 struct RequestHeader h = {get_xid(), ZOO_SETACL_OP};
4566 struct SetACLRequest req;
4567 int rc = Request_path_init(zh, 0, &req.path, path);
4568 if (rc != ZOK) {
4569 return rc;
4570 }
4571 oa = create_buffer_oarchive();
4572 req.acl = *acl;
4573 req.version = version;
4574 rc = serialize_RequestHeader(oa, "header", &h);
4575 rc = rc < 0 ? rc : serialize_SetACLRequest(oa, "req", &req);
4576 enter_critical(zh);
4577 rc = rc < 0 ? rc : add_void_completion(zh, h.xid, completion, data);
4578 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4579 get_buffer_len(oa));
4580 leave_critical(zh);
4581 free_duplicate_path(req.path, path);
4582 /* We queued the buffer, so don't free it */
4583 close_buffer_oarchive(&oa, 0);
4584
4585 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4586 zoo_get_current_server(zh));
4587 /* make a best (non-blocking) effort to send the requests asap */
4588 adaptor_send_queue(zh, 0);
4589 return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4590 }
4591
4592 /* Completions for multi-op results */
4593 static void op_result_string_completion(int err, const char *value, const void *data)
4594 {
4595 struct zoo_op_result *result = (struct zoo_op_result *)data;
4596 assert(result);
4597 result->err = err;
4598
4599 if (result->value && value) {
4600 int len = strlen(value) + 1;
4601 if (len > result->valuelen) {
4602 len = result->valuelen;
4603 }
4604 if (len > 0) {
4605 memcpy(result->value, value, len - 1);
4606 result->value[len - 1] = '\0';
4607 }
4608 } else {
4609 result->value = NULL;
4610 }
4611 }
4612
4613 static void op_result_void_completion(int err, const void *data)
4614 {
4615 struct zoo_op_result *result = (struct zoo_op_result *)data;
4616 assert(result);
4617 result->err = err;
4618 }
4619
4620 static void op_result_stat_completion(int err, const struct Stat *stat, const void *data)
4621 {
4622 struct zoo_op_result *result = (struct zoo_op_result *)data;
4623 assert(result);
4624 result->err = err;
4625
4626 if (result->stat && err == 0 && stat) {
4627 *result->stat = *stat;
4628 } else {
4629 result->stat = NULL ;
4630 }
4631 }
4632
4633 static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *req,
4634 const char *path, int version)
4635 {
4636 int rc ;
4637 assert(req);
4638 rc = Request_path_init(zh, 0, &req->path, path);
4639 if (rc != ZOK) {
4640 return rc;
4641 }
4642 req->version = version;
4643
4644 return ZOK;
4645 }
4646
4647 int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
4648 zoo_op_result_t *results, void_completion_t completion, const void *data)
4649 {
4650 struct RequestHeader h = {get_xid(), ZOO_MULTI_OP};
4651 struct MultiHeader mh = {-1, 1, -1};
4652 struct oarchive *oa = create_buffer_oarchive();
4653 completion_head_t clist = { 0 };
4654
4655 int rc = serialize_RequestHeader(oa, "header", &h);
4656
4657 int index = 0;
4658 for (index=0; index < count; index++) {
4659 const zoo_op_t *op = ops+index;
4660 zoo_op_result_t *result = results+index;
4661 completion_list_t *entry = NULL;
4662
4663 struct MultiHeader mh = {op->type, 0, -1};
4664 rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
4665
4666 switch(op->type) {
4667 case ZOO_CREATE_CONTAINER_OP:
4668 case ZOO_CREATE_OP: {
4669 struct CreateRequest req;
4670
4671 rc = rc < 0 ? rc : CreateRequest_init(zh, &req,
4672 op->create_op.path, op->create_op.data,
4673 op->create_op.datalen, op->create_op.acl,
4674 op->create_op.flags);
4675 rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
4676 result->value = op->create_op.buf;
4677 result->valuelen = op->create_op.buflen;
4678
4679 enter_critical(zh);
4680 entry = create_completion_entry(zh, h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
4681 leave_critical(zh);
4682 free_duplicate_path(req.path, op->create_op.path);
4683 break;
4684 }
4685
4686 case ZOO_DELETE_OP: {
4687 struct DeleteRequest req;
4688 rc = rc < 0 ? rc : DeleteRequest_init(zh, &req, op->delete_op.path, op->delete_op.version);
4689 rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
4690
4691 enter_critical(zh);
4692 entry = create_completion_entry(zh, h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
4693 leave_critical(zh);
4694 free_duplicate_path(req.path, op->delete_op.path);
4695 break;
4696 }
4697
4698 case ZOO_SETDATA_OP: {
4699 struct SetDataRequest req;
4700 rc = rc < 0 ? rc : SetDataRequest_init(zh, &req,
4701 op->set_op.path, op->set_op.data,
4702 op->set_op.datalen, op->set_op.version);
4703 rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
4704 result->stat = op->set_op.stat;
4705
4706 enter_critical(zh);
4707 entry = create_completion_entry(zh, h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
4708 leave_critical(zh);
4709 free_duplicate_path(req.path, op->set_op.path);
4710 break;
4711 }
4712
4713 case ZOO_CHECK_OP: {
4714 struct CheckVersionRequest req;
4715 rc = rc < 0 ? rc : CheckVersionRequest_init(zh, &req,
4716 op->check_op.path, op->check_op.version);
4717 rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
4718
4719 enter_critical(zh);
4720 entry = create_completion_entry(zh, h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
4721 leave_critical(zh);
4722 free_duplicate_path(req.path, op->check_op.path);
4723 break;
4724 }
4725
4726 default:
4727 LOG_ERROR(LOGCALLBACK(zh), "Unimplemented sub-op type=%d in multi-op", op->type);
4728 return ZUNIMPLEMENTED;
4729 }
4730
4731 queue_completion(&clist, entry, 0);
4732 }
4733
4734 rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
4735
4736 /* BEGIN: CRTICIAL SECTION */
4737 enter_critical(zh);
4738 rc = rc < 0 ? rc : add_multi_completion(zh, h.xid, completion, data, &clist);
4739 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4740 get_buffer_len(oa));
4741 leave_critical(zh);
4742
4743 /* We queued the buffer, so don't free it */
4744 close_buffer_oarchive(&oa, 0);
4745
4746 LOG_DEBUG(LOGCALLBACK(zh), "Sending multi request xid=%#x with %d subrequests to %s",
4747 h.xid, index, zoo_get_current_server(zh));
4748 /* make a best (non-blocking) effort to send the requests asap */
4749 adaptor_send_queue(zh, 0);
4750
4751 return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
4752 }
4753
4754 typedef union WatchesRequest WatchesRequest;
4755
4756 union WatchesRequest {
4757 struct CheckWatchesRequest check;
4758 struct RemoveWatchesRequest remove;
4759 };
4760
4761 static int aremove_watches(
4762 zhandle_t *zh, const char *path, ZooWatcherType wtype,
4763 watcher_fn watcher, void *watcherCtx, int local,
4764 void_completion_t *completion, const void *data, int all)
4765 {
4766 char *server_path = prepend_string(zh, path);
4767 int rc;
4768 struct oarchive *oa;
4769 struct RequestHeader h = {
4770 get_xid(),
4771 all ? ZOO_REMOVE_WATCHES : ZOO_CHECK_WATCHES
4772 };
4773 WatchesRequest req;
4774 watcher_deregistration_t *wdo;
4775
4776 if (!zh || !isValidPath(server_path, 0)) {
4777 rc = ZBADARGUMENTS;
4778 goto done;
4779 }
4780
4781 if (!local && is_unrecoverable(zh)) {
4782 rc = ZINVALIDSTATE;
4783 goto done;
4784 }
4785
4786 lock_watchers(zh);
4787 if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
4788 rc = ZNOWATCHER;
4789 unlock_watchers(zh);
4790 goto done;
4791 }
4792
4793 if (local) {
4794 removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
4795 unlock_watchers(zh);
4796 #ifdef THREADED
4797 notify_sync_completion((struct sync_completion *)data);
4798 #endif
4799 rc = ZOK;
4800 goto done;
4801 }
4802 unlock_watchers(zh);
4803
4804 oa = create_buffer_oarchive();
4805 rc = serialize_RequestHeader(oa, "header", &h);
4806
4807 if (all) {
4808 req.remove.path = (char*)server_path;
4809 req.remove.type = wtype;
4810 rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req.remove);
4811 } else {
4812 req.check.path = (char*)server_path;
4813 req.check.type = wtype;
4814 rc = rc < 0 ? rc : serialize_CheckWatchesRequest(oa, "req", &req.check);
4815 }
4816
4817 if (rc < 0) {
4818 goto done;
4819 }
4820
4821 wdo = create_watcher_deregistration(
4822 server_path, watcher, watcherCtx, wtype);
4823
4824 if (!wdo) {
4825 rc = ZSYSTEMERROR;
4826 goto done;
4827 }
4828
4829 enter_critical(zh);
4830 rc = add_completion_deregistration(
4831 zh, h.xid, COMPLETION_VOID, completion, data, 0, wdo, 0);
4832 rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4833 get_buffer_len(oa));
4834 rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
4835 leave_critical(zh);
4836
4837 /* We queued the buffer, so don't free it */
4838 close_buffer_oarchive(&oa, 0);
4839
4840 LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
4841 h.xid, path, zoo_get_current_server(zh));
4842
4843 adaptor_send_queue(zh, 0);
4844
4845 done:
4846 free_duplicate_path(server_path, path);
4847 return rc;
4848 }
4849 void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
4850 int valuelen, const struct ACL_vector *acl, int mode,
4851 char *path_buffer, int path_buffer_len)
4852 {
4853 assert(op);
4854 op->type = get_create_op_type(mode, ZOO_CREATE_OP);
4855 op->create_op.path = path;
4856 op->create_op.data = value;
4857 op->create_op.datalen = valuelen;
4858 op->create_op.acl = acl;
4859 op->create_op.flags = mode;
4860 op->create_op.ttl = 0;
4861 op->create_op.buf = path_buffer;
4862 op->create_op.buflen = path_buffer_len;
4863 }
4864
4865 void zoo_create2_op_init(zoo_op_t *op, const char *path, const char *value,
4866 int valuelen, const struct ACL_vector *acl, int mode,
4867 char *path_buffer, int path_buffer_len)
4868 {
4869 assert(op);
4870 op->type = get_create_op_type(mode, ZOO_CREATE2_OP);
4871 op->create_op.path = path;
4872 op->create_op.data = value;
4873 op->create_op.datalen = valuelen;
4874 op->create_op.acl = acl;
4875 op->create_op.flags = mode;
4876 op->create_op.buf = path_buffer;
4877 op->create_op.buflen = path_buffer_len;
4878 }
4879
4880 void zoo_delete_op_init(zoo_op_t *op, const char *path, int version)
4881 {
4882 assert(op);
4883 op->type = ZOO_DELETE_OP;
4884 op->delete_op.path = path;
4885 op->delete_op.version = version;
4886 }
4887
4888 void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer,
4889 int buflen, int version, struct Stat *stat)
4890 {
4891 assert(op);
4892 op->type = ZOO_SETDATA_OP;
4893 op->set_op.path = path;
4894 op->set_op.data = buffer;
4895 op->set_op.datalen = buflen;
4896 op->set_op.version = version;
4897 op->set_op.stat = stat;
4898 }
4899
4900 void zoo_check_op_init(zoo_op_t *op, const char *path, int version)
4901 {
4902 assert(op);
4903 op->type = ZOO_CHECK_OP;
4904 op->check_op.path = path;
4905 op->check_op.version = version;
4906 }
4907
4908 /* specify timeout of 0 to make the function non-blocking */
4909 /* timeout is in milliseconds */
4910 int flush_send_queue(zhandle_t*zh, int timeout)
4911 {
4912 int rc= ZOK;
4913 struct timeval started;
4914 #ifdef _WIN32
4915 fd_set pollSet;
4916 struct timeval wait;
4917 #endif
4918 get_system_time(&started);
4919 // we can't use dequeue_buffer() here because if (non-blocking) send_buffer()
4920 // returns EWOULDBLOCK we'd have to put the buffer back on the queue.
4921 // we use a recursive lock instead and only dequeue the buffer if a send was
4922 // successful
4923 lock_buffer_list(&zh->to_send);
4924 while (zh->to_send.head != 0 && (is_connected(zh) || is_sasl_auth_in_progress(zh))) {
4925 if (is_sasl_auth_in_progress(zh)) {
4926 // We don't let non-SASL packets escape as long as
4927 // negotiation is not complete. (SASL packets are always
4928 // pushed to the front of the queue.)
4929 buffer_list_t *buff = zh->to_send.head;
4930 int32_t type;
4931
4932 rc = extract_request_type(buff->buffer, buff->len, &type);
4933
4934 if (rc < 0 || type != ZOO_SASL_OP) {
4935 break;
4936 }
4937 }
4938
4939 if(timeout!=0){
4940 #ifndef _WIN32
4941 struct pollfd fds;
4942 #endif
4943 int elapsed;
4944 struct timeval now;
4945 get_system_time(&now);
4946 elapsed=calculate_interval(&started,&now);
4947 if (elapsed>timeout) {
4948 rc = ZOPERATIONTIMEOUT;
4949 break;
4950 }
4951
4952 #ifdef _WIN32
4953 wait = get_timeval(timeout-elapsed);
4954 FD_ZERO(&pollSet);
4955 FD_SET(zh->fd->sock, &pollSet);
4956 // Poll the socket
4957 rc = select((int)(zh->fd->sock)+1, NULL, &pollSet, NULL, &wait);
4958 #else
4959 fds.fd = zh->fd->sock;
4960 fds.events = POLLOUT;
4961 fds.revents = 0;
4962 rc = poll(&fds, 1, timeout-elapsed);
4963 #endif
4964 if (rc<=0) {
4965 /* timed out or an error or POLLERR */
4966 rc = rc==0 ? ZOPERATIONTIMEOUT : ZSYSTEMERROR;
4967 break;
4968 }
4969 }
4970
4971 rc = send_buffer(zh, zh->to_send.head);
4972 if(rc==0 && timeout==0){
4973 /* send_buffer would block while sending this buffer */
4974 rc = ZOK;
4975 break;
4976 }
4977 if (rc < 0) {
4978 rc = ZCONNECTIONLOSS;
4979 break;
4980 }
4981 // if the buffer has been sent successfully, remove it from the queue
4982 if (rc > 0)
4983 remove_buffer(&zh->to_send);
4984 get_system_time(&zh->last_send);
4985 rc = ZOK;
4986 }
4987 unlock_buffer_list(&zh->to_send);
4988 return rc;
4989 }
4990
4991 const char* zerror(int c)
4992 {
4993 switch (c){
4994 case ZOK:
4995 return "ok";
4996 case ZSYSTEMERROR:
4997 return "system error";
4998 case ZRUNTIMEINCONSISTENCY:
4999 return "run time inconsistency";
5000 case ZDATAINCONSISTENCY:
5001 return "data inconsistency";
5002 case ZCONNECTIONLOSS:
5003 return "connection loss";
5004 case ZMARSHALLINGERROR:
5005 return "marshalling error";
5006 case ZUNIMPLEMENTED:
5007 return "unimplemented";
5008 case ZOPERATIONTIMEOUT:
5009 return "operation timeout";
5010 case ZBADARGUMENTS:
5011 return "bad arguments";
5012 case ZINVALIDSTATE:
5013 return "invalid zhandle state";
5014 case ZNEWCONFIGNOQUORUM:
5015 return "no quorum of new config is connected and up-to-date with the leader of last commmitted config - try invoking reconfiguration after new servers are connected and synced";
5016 case ZRECONFIGINPROGRESS:
5017 return "Another reconfiguration is in progress -- concurrent reconfigs not supported (yet)";
5018 case ZAPIERROR:
5019 return "api error";
5020 case ZNONODE:
5021 return "no node";
5022 case ZNOAUTH:
5023 return "not authenticated";
5024 case ZBADVERSION:
5025 return "bad version";
5026 case ZNOCHILDRENFOREPHEMERALS:
5027 return "no children for ephemerals";
5028 case ZNODEEXISTS:
5029 return "node exists";
5030 case ZNOTEMPTY:
5031 return "not empty";
5032 case ZSESSIONEXPIRED:
5033 return "session expired";
5034 case ZINVALIDCALLBACK:
5035 return "invalid callback";
5036 case ZINVALIDACL:
5037 return "invalid acl";
5038 case ZAUTHFAILED:
5039 return "authentication failed";
5040 case ZCLOSING:
5041 return "zookeeper is closing";
5042 case ZNOTHING:
5043 return "(not error) no server responses to process";
5044 case ZSESSIONMOVED:
5045 return "session moved to another server, so operation is ignored";
5046 case ZNOTREADONLY:
5047 return "state-changing request is passed to read-only server";
5048 case ZEPHEMERALONLOCALSESSION:
5049 return "attempt to create ephemeral node on a local session";
5050 case ZNOWATCHER:
5051 return "the watcher couldn't be found";
5052 case ZRECONFIGDISABLED:
5053 return "attempts to perform a reconfiguration operation when reconfiguration feature is disable";
5054 case ZSESSIONCLOSEDREQUIRESASLAUTH:
5055 return "session closed by server because client is required to do SASL authentication";
5056 case ZTHROTTLEDOP:
5057 return "Operation was throttled due to high load";
5058 }
5059 if (c > 0) {
5060 return strerror(c);
5061 }
5062 return "unknown error";
5063 }
5064
5065 int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert,
5066 int certLen,void_completion_t completion, const void *data)
5067 {
5068 struct buffer auth;
5069 auth_info *authinfo;
5070 if(scheme==NULL || zh==NULL)
5071 return ZBADARGUMENTS;
5072
5073 if (is_unrecoverable(zh))
5074 return ZINVALIDSTATE;
5075
5076 // [ZOOKEEPER-800] zoo_add_auth should return ZINVALIDSTATE if
5077 // the connection is closed.
5078 if (zoo_state(zh) == 0) {
5079 return ZINVALIDSTATE;
5080 }
5081
5082 if(cert!=NULL && certLen!=0){
5083 auth.buff=calloc(1,certLen);
5084 if(auth.buff==0) {
5085 return ZSYSTEMERROR;
5086 }
5087 memcpy(auth.buff,cert,certLen);
5088 auth.len=certLen;
5089 } else {
5090 auth.buff = 0;
5091 auth.len = 0;
5092 }
5093
5094 zoo_lock_auth(zh);
5095 authinfo = (auth_info*) malloc(sizeof(auth_info));
5096 authinfo->scheme=strdup(scheme);
5097 authinfo->auth=auth;
5098 authinfo->completion=completion;
5099 authinfo->data=data;
5100 authinfo->next = NULL;
5101 add_last_auth(&zh->auth_h, authinfo);
5102 zoo_unlock_auth(zh);
5103
5104 if (is_connected(zh) ||
5105 // When associating, only send info packets if no SASL
5106 // negotiation is planned. (Such packets would be queued in
5107 // front of SASL packets, which is forbidden, and SASL
5108 // completion is followed by a 'send_auth_info' anyway.)
5109 (zh->state == ZOO_ASSOCIATING_STATE && !has_sasl_client(zh))) {
5110 return send_last_auth_info(zh);
5111 }
5112
5113 return ZOK;
5114 }
5115
5116 static const char* format_endpoint_info(const struct sockaddr_storage* ep)
5117 {
5118 static char buf[134] = { 0 };
5119 char addrstr[INET6_ADDRSTRLEN] = { 0 };
5120 const char *fmtstring;
5121 void *inaddr;
5122 char is_inet6 = 0; // poor man's boolean
5123 #ifdef _WIN32
5124 char * addrstring;
5125 #endif
5126 int port;
5127 if(ep==0)
5128 return "null";
5129
5130 #if defined(AF_INET6)
5131 if(ep->ss_family==AF_INET6){
5132 inaddr=&((struct sockaddr_in6*)ep)->sin6_addr;
5133 port=((struct sockaddr_in6*)ep)->sin6_port;
5134 is_inet6 = 1;
5135 } else {
5136 #endif
5137 inaddr=&((struct sockaddr_in*)ep)->sin_addr;
5138 port=((struct sockaddr_in*)ep)->sin_port;
5139 #if defined(AF_INET6)
5140 }
5141 #endif
5142 fmtstring = (is_inet6 ? "[%s]:%d" : "%s:%d");
5143 #ifdef _WIN32
5144 addrstring = inet_ntoa (*(struct in_addr*)inaddr);
5145 sprintf(buf,fmtstring,addrstring,ntohs(port));
5146 #else
5147 inet_ntop(ep->ss_family,inaddr,addrstr,sizeof(addrstr)-1);
5148 sprintf(buf,fmtstring,addrstr,ntohs(port));
5149 #endif
5150 return buf;
5151 }
5152
5153 log_callback_fn zoo_get_log_callback(const zhandle_t* zh)
5154 {
5155 // Verify we have a valid handle
5156 if (zh == NULL) {
5157 return NULL;
5158 }
5159
5160 return zh->log_callback;
5161 }
5162
5163 void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback)
5164 {
5165 // Verify we have a valid handle
5166 if (zh == NULL) {
5167 return;
5168 }
5169
5170 zh->log_callback = callback;
5171 }
5172
5173 void zoo_deterministic_conn_order(int yesOrNo)
5174 {
5175 disable_conn_permute=yesOrNo;
5176 }
5177
5178 #ifdef THREADED
5179
5180 static void process_sync_completion(zhandle_t *zh,
5181 completion_list_t *cptr,
5182 struct sync_completion *sc,
5183 struct iarchive *ia)
5184 {
5185 LOG_DEBUG(LOGCALLBACK(zh), "Processing sync_completion with type=%d xid=%#x rc=%d",
5186 cptr->c.type, cptr->xid, sc->rc);
5187
5188 switch(cptr->c.type) {
5189 case COMPLETION_DATA:
5190 if (sc->rc==0) {
5191 struct GetDataResponse res;
5192 int len;
5193 deserialize_GetDataResponse(ia, "reply", &res);
5194 if (res.data.len <= sc->u.data.buff_len) {
5195 len = res.data.len;
5196 } else {
5197 len = sc->u.data.buff_len;
5198 }
5199 sc->u.data.buff_len = len;
5200 // check if len is negative
5201 // just of NULL which is -1 int
5202 if (len == -1) {
5203 sc->u.data.buffer = NULL;
5204 } else {
5205 memcpy(sc->u.data.buffer, res.data.buff, len);
5206 }
5207 sc->u.data.stat = res.stat;
5208 deallocate_GetDataResponse(&res);
5209 }
5210 break;
5211 case COMPLETION_STAT:
5212 if (sc->rc==0) {
5213 struct SetDataResponse res;
5214 deserialize_SetDataResponse(ia, "reply", &res);
5215 sc->u.stat = res.stat;
5216 deallocate_SetDataResponse(&res);
5217 }
5218 break;
5219 case COMPLETION_STRINGLIST:
5220 if (sc->rc==0) {
5221 struct GetChildrenResponse res;
5222 deserialize_GetChildrenResponse(ia, "reply", &res);
5223 sc->u.strs2 = res.children;
5224 /* We don't deallocate since we are passing it back */
5225 // deallocate_GetChildrenResponse(&res);
5226 }
5227 break;
5228 case COMPLETION_STRINGLIST_STAT:
5229 if (sc->rc==0) {
5230 struct GetChildren2Response res;
5231 deserialize_GetChildren2Response(ia, "reply", &res);
5232 sc->u.strs_stat.strs2 = res.children;
5233 sc->u.strs_stat.stat2 = res.stat;
5234 /* We don't deallocate since we are passing it back */
5235 // deallocate_GetChildren2Response(&res);
5236 }
5237 break;
5238 case COMPLETION_STRING:
5239 if (sc->rc==0) {
5240 struct CreateResponse res;
5241 int len;
5242 const char * client_path;
5243 deserialize_CreateResponse(ia, "reply", &res);
5244 //ZOOKEEPER-1027
5245 client_path = sub_string(zh, res.path);
5246 len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
5247 len = sc->u.str.str_len;
5248 }
5249 if (len > 0) {
5250 memcpy(sc->u.str.str, client_path, len - 1);
5251 sc->u.str.str[len - 1] = '\0';
5252 }
5253 free_duplicate_path(client_path, res.path);
5254 deallocate_CreateResponse(&res);
5255 }
5256 break;
5257 case COMPLETION_STRING_STAT:
5258 if (sc->rc==0) {
5259 struct Create2Response res;
5260 int len;
5261 const char * client_path;
5262 deserialize_Create2Response(ia, "reply", &res);
5263 client_path = sub_string(zh, res.path);
5264 len = strlen(client_path) + 1;
5265 if (len > sc->u.str.str_len) {
5266 len = sc->u.str.str_len;
5267 }
5268 if (len > 0) {
5269 memcpy(sc->u.str.str, client_path, len - 1);
5270 sc->u.str.str[len - 1] = '\0';
5271 }
5272 free_duplicate_path(client_path, res.path);
5273 sc->u.stat = res.stat;
5274 deallocate_Create2Response(&res);
5275 }
5276 break;
5277 case COMPLETION_ACLLIST:
5278 if (sc->rc==0) {
5279 struct GetACLResponse res;
5280 deserialize_GetACLResponse(ia, "reply", &res);
5281 sc->u.acl.acl = res.acl;
5282 sc->u.acl.stat = res.stat;
5283 /* We don't deallocate since we are passing it back */
5284 //deallocate_GetACLResponse(&res);
5285 }
5286 break;
5287 case COMPLETION_VOID:
5288 break;
5289 case COMPLETION_MULTI:
5290 sc->rc = deserialize_multi(zh, cptr->xid, cptr, ia);
5291 break;
5292 default:
5293 LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
5294 break;
5295 }
5296 }
5297
5298 /*---------------------------------------------------------------------------*
5299 * SYNC API
5300 *---------------------------------------------------------------------------*/
5301 int zoo_create(zhandle_t *zh, const char *path, const char *value,
5302 int valuelen, const struct ACL_vector *acl, int mode,
5303 char *path_buffer, int path_buffer_len)
5304 {
5305 return zoo_create_ttl(zh, path, value, valuelen, acl, mode, -1,
5306 path_buffer, path_buffer_len);
5307 }
5308
5309 int zoo_create_ttl(zhandle_t *zh, const char *path, const char *value,
5310 int valuelen, const struct ACL_vector *acl, int mode, int64_t ttl,
5311 char *path_buffer, int path_buffer_len)
5312 {
5313 struct sync_completion *sc = alloc_sync_completion();
5314 int rc;
5315 if (!sc) {
5316 return ZSYSTEMERROR;
5317 }
5318 sc->u.str.str = path_buffer;
5319 sc->u.str.str_len = path_buffer_len;
5320 rc=zoo_acreate_ttl(zh, path, value, valuelen, acl, mode, ttl, SYNCHRONOUS_MARKER, sc);
5321 if(rc==ZOK){
5322 wait_sync_completion(sc);
5323 rc = sc->rc;
5324 }
5325 free_sync_completion(sc);
5326 return rc;
5327 }
5328
5329 int zoo_create2(zhandle_t *zh, const char *path, const char *value,
5330 int valuelen, const struct ACL_vector *acl, int mode,
5331 char *path_buffer, int path_buffer_len, struct Stat *stat)
5332 {
5333 return zoo_create2_ttl(zh, path, value, valuelen, acl, mode, -1,
5334 path_buffer, path_buffer_len, stat);
5335 }
5336
5337 int zoo_create2_ttl(zhandle_t *zh, const char *path, const char *value,
5338 int valuelen, const struct ACL_vector *acl, int mode, int64_t ttl,
5339 char *path_buffer, int path_buffer_len, struct Stat *stat)
5340 {
5341 struct sync_completion *sc = alloc_sync_completion();
5342 int rc;
5343 if (!sc) {
5344 return ZSYSTEMERROR;
5345 }
5346
5347 sc->u.str.str = path_buffer;
5348 sc->u.str.str_len = path_buffer_len;
5349 rc=zoo_acreate2_ttl(zh, path, value, valuelen, acl, mode, ttl, SYNCHRONOUS_MARKER, sc);
5350 if(rc==ZOK){
5351 wait_sync_completion(sc);
5352 rc = sc->rc;
5353 if (rc == 0 && stat) {
5354 *stat = sc->u.stat;
5355 }
5356 }
5357 free_sync_completion(sc);
5358 return rc;
5359 }
5360
5361 int zoo_delete(zhandle_t *zh, const char *path, int version)
5362 {
5363 struct sync_completion *sc = alloc_sync_completion();
5364 int rc;
5365 if (!sc) {
5366 return ZSYSTEMERROR;
5367 }
5368 rc=zoo_adelete(zh, path, version, SYNCHRONOUS_MARKER, sc);
5369 if(rc==ZOK){
5370 wait_sync_completion(sc);
5371 rc = sc->rc;
5372 }
5373 free_sync_completion(sc);
5374 return rc;
5375 }
5376
5377 int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
5378 {
5379 return zoo_wexists(zh,path,watch?zh->watcher:0,zh->context,stat);
5380 }
5381
5382 int zoo_wexists(zhandle_t *zh, const char *path,
5383 watcher_fn watcher, void* watcherCtx, struct Stat *stat)
5384 {
5385 struct sync_completion *sc = alloc_sync_completion();
5386 int rc;
5387 if (!sc) {
5388 return ZSYSTEMERROR;
5389 }
5390 rc=zoo_awexists(zh,path,watcher,watcherCtx,SYNCHRONOUS_MARKER, sc);
5391 if(rc==ZOK){
5392 wait_sync_completion(sc);
5393 rc = sc->rc;
5394 if (rc == 0&& stat) {
5395 *stat = sc->u.stat;
5396 }
5397 }
5398 free_sync_completion(sc);
5399 return rc;
5400 }
5401
5402 int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
5403 int* buffer_len, struct Stat *stat)
5404 {
5405 return zoo_wget(zh,path,watch?zh->watcher:0,zh->context,
5406 buffer,buffer_len,stat);
5407 }
5408
5409 int zoo_wget(zhandle_t *zh, const char *path,
5410 watcher_fn watcher, void* watcherCtx,
5411 char *buffer, int* buffer_len, struct Stat *stat)
5412 {
5413 struct sync_completion *sc;
5414 int rc=0;
5415
5416 if(buffer_len==NULL)
5417 return ZBADARGUMENTS;
5418 if((sc=alloc_sync_completion())==NULL)
5419 return ZSYSTEMERROR;
5420
5421 sc->u.data.buffer = buffer;
5422 sc->u.data.buff_len = *buffer_len;
5423 rc=zoo_awget(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
5424 if(rc==ZOK){
5425 wait_sync_completion(sc);
5426 rc = sc->rc;
5427 if (rc == 0) {
5428 if(stat)
5429 *stat = sc->u.data.stat;
5430 *buffer_len = sc->u.data.buff_len;
5431 }
5432 }
5433 free_sync_completion(sc);
5434 return rc;
5435 }
5436
5437 int zoo_getconfig(zhandle_t *zh, int watch, char *buffer,
5438 int* buffer_len, struct Stat *stat)
5439 {
5440 return zoo_wget(zh,ZOO_CONFIG_NODE,watch?zh->watcher:0,zh->context, buffer,buffer_len,stat);
5441 }
5442
5443 int zoo_wgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
5444 char *buffer, int* buffer_len, struct Stat *stat)
5445 {
5446 return zoo_wget(zh, ZOO_CONFIG_NODE, watcher, watcherCtx, buffer, buffer_len, stat);
5447 }
5448
5449
5450 int zoo_reconfig(zhandle_t *zh, const char *joining, const char *leaving,
5451 const char *members, int64_t version, char *buffer, int* buffer_len,
5452 struct Stat *stat)
5453 {
5454 struct sync_completion *sc;
5455 int rc=0;
5456
5457 if(buffer_len==NULL)
5458 return ZBADARGUMENTS;
5459 if((sc=alloc_sync_completion())==NULL)
5460 return ZSYSTEMERROR;
5461
5462 sc->u.data.buffer = buffer;
5463 sc->u.data.buff_len = *buffer_len;
5464 rc=zoo_areconfig(zh, joining, leaving, members, version, SYNCHRONOUS_MARKER, sc);
5465
5466 if(rc==ZOK){
5467 wait_sync_completion(sc);
5468 rc = sc->rc;
5469 if (rc == 0) {
5470 if(stat)
5471 *stat = sc->u.data.stat;
5472 *buffer_len = sc->u.data.buff_len;
5473 }
5474 }
5475 free_sync_completion(sc);
5476 return rc;
5477 }
5478
5479 int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
5480 int version)
5481 {
5482 return zoo_set2(zh, path, buffer, buflen, version, 0);
5483 }
5484
5485 int zoo_set2(zhandle_t *zh, const char *path, const char *buffer, int buflen,
5486 int version, struct Stat *stat)
5487 {
5488 struct sync_completion *sc = alloc_sync_completion();
5489 int rc;
5490 if (!sc) {
5491 return ZSYSTEMERROR;
5492 }
5493 rc=zoo_aset(zh, path, buffer, buflen, version, SYNCHRONOUS_MARKER, sc);
5494 if(rc==ZOK){
5495 wait_sync_completion(sc);
5496 rc = sc->rc;
5497 if (rc == 0 && stat) {
5498 *stat = sc->u.stat;
5499 }
5500 }
5501 free_sync_completion(sc);
5502 return rc;
5503 }
5504
5505 static int zoo_wget_children_(zhandle_t *zh, const char *path,
5506 watcher_fn watcher, void* watcherCtx,
5507 struct String_vector *strings)
5508 {
5509 struct sync_completion *sc = alloc_sync_completion();
5510 int rc;
5511 if (!sc) {
5512 return ZSYSTEMERROR;
5513 }
5514 rc= zoo_awget_children (zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
5515 if(rc==ZOK){
5516 wait_sync_completion(sc);
5517 rc = sc->rc;
5518 if (rc == 0) {
5519 if (strings) {
5520 *strings = sc->u.strs2;
5521 } else {
5522 deallocate_String_vector(&sc->u.strs2);
5523 }
5524 }
5525 }
5526 free_sync_completion(sc);
5527 return rc;
5528 }
5529
5530 static int zoo_wget_children2_(zhandle_t *zh, const char *path,
5531 watcher_fn watcher, void* watcherCtx,
5532 struct String_vector *strings, struct Stat *stat)
5533 {
5534 struct sync_completion *sc = alloc_sync_completion();
5535 int rc;
5536 if (!sc) {
5537 return ZSYSTEMERROR;
5538 }
5539 rc= zoo_awget_children2(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
5540
5541 if(rc==ZOK){
5542 wait_sync_completion(sc);
5543 rc = sc->rc;
5544 if (rc == 0) {
5545 *stat = sc->u.strs_stat.stat2;
5546 if (strings) {
5547 *strings = sc->u.strs_stat.strs2;
5548 } else {
5549 deallocate_String_vector(&sc->u.strs_stat.strs2);
5550 }
5551 }
5552 }
5553 free_sync_completion(sc);
5554 return rc;
5555 }
5556
5557 int zoo_get_children(zhandle_t *zh, const char *path, int watch,
5558 struct String_vector *strings)
5559 {
5560 return zoo_wget_children_(zh,path,watch?zh->watcher:0,zh->context,strings);
5561 }
5562
5563 int zoo_wget_children(zhandle_t *zh, const char *path,
5564 watcher_fn watcher, void* watcherCtx,
5565 struct String_vector *strings)
5566 {
5567 return zoo_wget_children_(zh,path,watcher,watcherCtx,strings);
5568 }
5569
5570 int zoo_get_children2(zhandle_t *zh, const char *path, int watch,
5571 struct String_vector *strings, struct Stat *stat)
5572 {
5573 return zoo_wget_children2_(zh,path,watch?zh->watcher:0,zh->context,strings,stat);
5574 }
5575
5576 int zoo_wget_children2(zhandle_t *zh, const char *path,
5577 watcher_fn watcher, void* watcherCtx,
5578 struct String_vector *strings, struct Stat *stat)
5579 {
5580 return zoo_wget_children2_(zh,path,watcher,watcherCtx,strings,stat);
5581 }
5582
5583 int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl,
5584 struct Stat *stat)
5585 {
5586 struct sync_completion *sc = alloc_sync_completion();
5587 int rc;
5588 if (!sc) {
5589 return ZSYSTEMERROR;
5590 }
5591 rc=zoo_aget_acl(zh, path, SYNCHRONOUS_MARKER, sc);
5592 if(rc==ZOK){
5593 wait_sync_completion(sc);
5594 rc = sc->rc;
5595 if (rc == 0&& stat) {
5596 *stat = sc->u.acl.stat;
5597 }
5598 if (rc == 0) {
5599 if (acl) {
5600 *acl = sc->u.acl.acl;
5601 } else {
5602 deallocate_ACL_vector(&sc->u.acl.acl);
5603 }
5604 }
5605 }
5606 free_sync_completion(sc);
5607 return rc;
5608 }
5609
5610 int zoo_set_acl(zhandle_t *zh, const char *path, int version,
5611 const struct ACL_vector *acl)
5612 {
5613 struct sync_completion *sc = alloc_sync_completion();
5614 int rc;
5615 if (!sc) {
5616 return ZSYSTEMERROR;
5617 }
5618 rc=zoo_aset_acl(zh, path, version, (struct ACL_vector*)acl,
5619 SYNCHRONOUS_MARKER, sc);
5620 if(rc==ZOK){
5621 wait_sync_completion(sc);
5622 rc = sc->rc;
5623 }
5624 free_sync_completion(sc);
5625 return rc;
5626 }
5627
5628 static int remove_watches(
5629 zhandle_t *zh, const char *path, ZooWatcherType wtype,
5630 watcher_fn watcher, void *wctx, int local, int all)
5631 {
5632 int rc = 0;
5633 struct sync_completion *sc;
5634
5635 if (!path)
5636 return ZBADARGUMENTS;
5637
5638 sc = alloc_sync_completion();
5639 if (!sc)
5640 return ZSYSTEMERROR;
5641
5642 rc = aremove_watches(zh, path, wtype, watcher, wctx, local,
5643 SYNCHRONOUS_MARKER, sc, all);
5644 if (rc == ZOK) {
5645 wait_sync_completion(sc);
5646 rc = sc->rc;
5647 }
5648 free_sync_completion(sc);
5649 return rc;
5650 }
5651
5652 int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
5653 {
5654 int rc;
5655
5656 struct sync_completion *sc = alloc_sync_completion();
5657 if (!sc) {
5658 return ZSYSTEMERROR;
5659 }
5660
5661 rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
5662 if (rc == ZOK) {
5663 wait_sync_completion(sc);
5664 rc = sc->rc;
5665 }
5666 free_sync_completion(sc);
5667
5668 return rc;
5669 }
5670
5671 int zoo_remove_watches(zhandle_t *zh, const char *path, ZooWatcherType wtype,
5672 watcher_fn watcher, void *watcherCtx, int local)
5673 {
5674 return remove_watches(zh, path, wtype, watcher, watcherCtx, local, 0);
5675 }
5676
5677 int zoo_remove_all_watches(
5678 zhandle_t *zh, const char *path, ZooWatcherType wtype, int local)
5679 {
5680 return remove_watches(zh, path, wtype, NULL, NULL, local, 1);
5681
5682 }
5683 #endif
5684
5685 int zoo_aremove_watches(zhandle_t *zh, const char *path, ZooWatcherType wtype,
5686 watcher_fn watcher, void *watcherCtx, int local,
5687 void_completion_t *completion, const void *data)
5688 {
5689 return aremove_watches(
5690 zh, path, wtype, watcher, watcherCtx, local, completion, data, 0);
5691 }
5692
5693 int zoo_aremove_all_watches(zhandle_t *zh, const char *path,
5694 ZooWatcherType wtype, int local, void_completion_t *completion,
5695 const void *data)
5696 {
5697 return aremove_watches(
5698 zh, path, wtype, NULL, NULL, local, completion, data, 1);
5699 }
5700