1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #if !defined(DLL_EXPORT) && !defined(USE_STATIC_LIB)
20 #  define USE_STATIC_LIB
21 #endif
22 
23 #if defined(__CYGWIN__)
24 #define USE_IPV6
25 #endif
26 
27 #include "config.h"
28 #include <zookeeper.h>
29 #include <zookeeper.jute.h>
30 #include <proto.h>
31 #include "zk_adaptor.h"
32 #include "zookeeper_log.h"
33 #include "zk_hashtable.h"
34 
35 #ifdef HAVE_CYRUS_SASL_H
36 #include "zk_sasl.h"
37 #endif /* HAVE_CYRUS_SASL_H */
38 
39 #include <stdlib.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <time.h>
43 #include <errno.h>
44 #include <fcntl.h>
45 #include <assert.h>
46 #include <stdarg.h>
47 #include <limits.h>
48 
49 #ifdef HAVE_SYS_TIME_H
50 #include <sys/time.h>
51 #endif
52 
53 #ifdef HAVE_SYS_SOCKET_H
54 #include <sys/socket.h>
55 #endif
56 
57 #ifdef HAVE_POLL
58 #include <poll.h>
59 #endif
60 
61 #ifdef HAVE_NETINET_IN_H
62 #include <netinet/in.h>
63 #include <netinet/tcp.h>
64 #endif
65 
66 #ifdef HAVE_ARPA_INET_H
67 #include <arpa/inet.h>
68 #endif
69 
70 #ifdef HAVE_NETDB_H
71 #include <netdb.h>
72 #endif
73 
74 #ifdef HAVE_UNISTD_H
75 #include <unistd.h> // needed for _POSIX_MONOTONIC_CLOCK
76 #endif
77 
78 #ifdef HAVE_SYS_UTSNAME_H
79 #include <sys/utsname.h>
80 #endif
81 
82 #ifdef HAVE_GETPWUID_R
83 #include <pwd.h>
84 #endif
85 
86 #ifdef HAVE_OPENSSL_H
87 #include <openssl/ssl.h>
88 #include <openssl/err.h>
89 #endif
90 
91 #ifdef __MACH__ // OS X
92 #include <mach/clock.h>
93 #include <mach/mach.h>
94 #include <netinet/tcp.h>
95 #endif
96 
97 #ifdef WIN32
98 #include <process.h> /* for getpid */
99 #include <direct.h> /* for getcwd */
100 #define EAI_ADDRFAMILY WSAEINVAL /* is this still needed? */
101 #define EHOSTDOWN EPIPE
102 #define ESTALE ENODEV
103 #endif
104 
105 #define IF_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) {x;}
106 
107 const int ZOOKEEPER_WRITE = 1 << 0;
108 const int ZOOKEEPER_READ = 1 << 1;
109 
110 const int ZOO_PERSISTENT = 0;
111 const int ZOO_EPHEMERAL = 1;
112 const int ZOO_PERSISTENT_SEQUENTIAL = 2;
113 const int ZOO_EPHEMERAL_SEQUENTIAL = 3;
114 const int ZOO_CONTAINER = 4;
115 const int ZOO_PERSISTENT_WITH_TTL = 5;
116 const int ZOO_PERSISTENT_SEQUENTIAL_WITH_TTL = 6;
117 
118 #define ZOOKEEPER_IS_SEQUENCE(mode) \
119     ((mode) == ZOO_PERSISTENT_SEQUENTIAL || \
120      (mode) == ZOO_EPHEMERAL_SEQUENTIAL || \
121      (mode) == ZOO_PERSISTENT_SEQUENTIAL_WITH_TTL)
122 #define ZOOKEEPER_IS_TTL(mode) \
123     ((mode) == ZOO_PERSISTENT_WITH_TTL || \
124      (mode) == ZOO_PERSISTENT_SEQUENTIAL_WITH_TTL)
125 
126 // keep ZOO_SEQUENCE as a bitmask for compatibility reasons
127 const int ZOO_SEQUENCE = 1 << 1;
128 
129 #define ZOO_MAX_TTL 0xFFFFFFFFFFLL
130 
131 const int ZOO_EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
132 const int ZOO_AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
133 const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
134 const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
135 const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
136 const int ZOO_READONLY_STATE = READONLY_STATE_DEF;
137 const int ZOO_SSL_CONNECTING_STATE = SSL_CONNECTING_STATE_DEF;
138 const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
139 
state2String(int state)140 static __attribute__ ((unused)) const char* state2String(int state){
141     switch(state){
142     case 0:
143         return "ZOO_CLOSED_STATE";
144     case CONNECTING_STATE_DEF:
145         return "ZOO_CONNECTING_STATE";
146     case SSL_CONNECTING_STATE_DEF:
147         return "ZOO_SSL_CONNECTING_STATE";
148     case ASSOCIATING_STATE_DEF:
149         return "ZOO_ASSOCIATING_STATE";
150     case CONNECTED_STATE_DEF:
151         return "ZOO_CONNECTED_STATE";
152     case READONLY_STATE_DEF:
153         return "ZOO_READONLY_STATE";
154     case EXPIRED_SESSION_STATE_DEF:
155         return "ZOO_EXPIRED_SESSION_STATE";
156     case AUTH_FAILED_STATE_DEF:
157         return "ZOO_AUTH_FAILED_STATE";
158     }
159     return "INVALID_STATE";
160 }
161 
162 const int ZOO_CREATED_EVENT = CREATED_EVENT_DEF;
163 const int ZOO_DELETED_EVENT = DELETED_EVENT_DEF;
164 const int ZOO_CHANGED_EVENT = CHANGED_EVENT_DEF;
165 const int ZOO_CHILD_EVENT = CHILD_EVENT_DEF;
166 const int ZOO_SESSION_EVENT = SESSION_EVENT_DEF;
167 const int ZOO_NOTWATCHING_EVENT = NOTWATCHING_EVENT_DEF;
watcherEvent2String(int ev)168 static __attribute__ ((unused)) const char* watcherEvent2String(int ev){
169     switch(ev){
170     case 0:
171         return "ZOO_ERROR_EVENT";
172     case CREATED_EVENT_DEF:
173         return "ZOO_CREATED_EVENT";
174     case DELETED_EVENT_DEF:
175         return "ZOO_DELETED_EVENT";
176     case CHANGED_EVENT_DEF:
177         return "ZOO_CHANGED_EVENT";
178     case CHILD_EVENT_DEF:
179         return "ZOO_CHILD_EVENT";
180     case SESSION_EVENT_DEF:
181         return "ZOO_SESSION_EVENT";
182     case NOTWATCHING_EVENT_DEF:
183         return "ZOO_NOTWATCHING_EVENT";
184     }
185     return "INVALID_EVENT";
186 }
187 
188 const int ZOO_PERM_READ = 1 << 0;
189 const int ZOO_PERM_WRITE = 1 << 1;
190 const int ZOO_PERM_CREATE = 1 << 2;
191 const int ZOO_PERM_DELETE = 1 << 3;
192 const int ZOO_PERM_ADMIN = 1 << 4;
193 const int ZOO_PERM_ALL = 0x1f;
194 struct Id ZOO_ANYONE_ID_UNSAFE = {"world", "anyone"};
195 struct Id ZOO_AUTH_IDS = {"auth", ""};
196 static struct ACL _OPEN_ACL_UNSAFE_ACL[] = {{0x1f, {"world", "anyone"}}};
197 static struct ACL _READ_ACL_UNSAFE_ACL[] = {{0x01, {"world", "anyone"}}};
198 static struct ACL _CREATOR_ALL_ACL_ACL[] = {{0x1f, {"auth", ""}}};
199 struct ACL_vector ZOO_OPEN_ACL_UNSAFE = { 1, _OPEN_ACL_UNSAFE_ACL};
200 struct ACL_vector ZOO_READ_ACL_UNSAFE = { 1, _READ_ACL_UNSAFE_ACL};
201 struct ACL_vector ZOO_CREATOR_ALL_ACL = { 1, _CREATOR_ALL_ACL_ACL};
202 
203 #define COMPLETION_WATCH -1
204 #define COMPLETION_VOID 0
205 #define COMPLETION_STAT 1
206 #define COMPLETION_DATA 2
207 #define COMPLETION_STRINGLIST 3
208 #define COMPLETION_STRINGLIST_STAT 4
209 #define COMPLETION_ACLLIST 5
210 #define COMPLETION_STRING 6
211 #define COMPLETION_MULTI 7
212 #define COMPLETION_STRING_STAT 8
213 
214 typedef struct _auth_completion_list {
215     void_completion_t completion;
216     const char *auth_data;
217     struct _auth_completion_list *next;
218 } auth_completion_list_t;
219 
220 typedef struct completion {
221     int type; /* one of COMPLETION_* values above */
222     union {
223         void_completion_t void_result;
224         stat_completion_t stat_result;
225         data_completion_t data_result;
226         strings_completion_t strings_result;
227         strings_stat_completion_t strings_stat_result;
228         acl_completion_t acl_result;
229         string_completion_t string_result;
230         string_stat_completion_t string_stat_result;
231         struct watcher_object_list *watcher_result;
232     };
233     completion_head_t clist; /* For multi-op */
234 } completion_t;
235 
236 typedef struct _completion_list {
237     int xid;
238     completion_t c;
239     const void *data;
240     buffer_list_t *buffer;
241     struct _completion_list *next;
242     watcher_registration_t* watcher;
243     watcher_deregistration_t* watcher_deregistration;
244 } completion_list_t;
245 
246 const char*err2string(int err);
247 static inline int calculate_interval(const struct timeval *start,
248         const struct timeval *end);
249 static int queue_session_event(zhandle_t *zh, int state);
250 static const char* format_endpoint_info(const struct sockaddr_storage* ep);
251 
252 /* deserialize forward declarations */
253 static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
254 static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia);
255 
256 /* completion routine forward declarations */
257 static int add_completion(zhandle_t *zh, int xid, int completion_type,
258         const void *dc, const void *data, int add_to_front,
259         watcher_registration_t* wo, completion_head_t *clist);
260 static int add_completion_deregistration(zhandle_t *zh, int xid,
261         int completion_type, const void *dc, const void *data,
262         int add_to_front, watcher_deregistration_t* wo,
263         completion_head_t *clist);
264 static int do_add_completion(zhandle_t *zh, const void *dc, completion_list_t *c,
265         int add_to_front);
266 
267 static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
268         const void *dc, const void *data, watcher_registration_t* wo,
269         completion_head_t *clist);
270 static completion_list_t* create_completion_entry_deregistration(zhandle_t *zh,
271         int xid, int completion_type, const void *dc, const void *data,
272         watcher_deregistration_t* wo, completion_head_t *clist);
273 static completion_list_t* do_create_completion_entry(zhandle_t *zh,
274         int xid, int completion_type, const void *dc, const void *data,
275         watcher_registration_t* wo, completion_head_t *clist,
276         watcher_deregistration_t* wdo);
277 static void destroy_completion_entry(completion_list_t* c);
278 static void queue_completion_nolock(completion_head_t *list, completion_list_t *c,
279         int add_to_front);
280 static void queue_completion(completion_head_t *list, completion_list_t *c,
281         int add_to_front);
282 static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
283     const char* format,...);
284 static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
285 
286 static int disable_conn_permute=0; // permute enabled by default
287 static struct sockaddr_storage *addr_rw_server = 0;
288 
289 static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
290 static int isValidPath(const char* path, const int mode);
291 #ifdef HAVE_OPENSSL_H
292 static int init_ssl_for_handler(zhandle_t *zh);
293 static int init_ssl_for_socket(zsock_t *fd, zhandle_t *zh, int fail_on_error);
294 #endif
295 
296 static int aremove_watches(
297     zhandle_t *zh, const char *path, ZooWatcherType wtype,
298     watcher_fn watcher, void *watcherCtx, int local,
299     void_completion_t *completion, const void *data, int all);
300 
301 #ifdef THREADED
302 static void process_sync_completion(zhandle_t *zh,
303         completion_list_t *cptr,
304         struct sync_completion *sc,
305         struct iarchive *ia);
306 
307 static int remove_watches(
308     zhandle_t *zh, const char *path, ZooWatcherType wtype,
309     watcher_fn watcher, void *watcherCtx, int local, int all);
310 #endif
311 
312 #ifdef _WIN32
313 typedef SOCKET socket_t;
314 typedef int sendsize_t;
315 #define SEND_FLAGS  0
316 #else
317 #ifdef __APPLE__
318 #define SEND_FLAGS SO_NOSIGPIPE
319 #endif
320 #ifdef __linux__
321 #define SEND_FLAGS MSG_NOSIGNAL
322 #endif
323 #ifndef SEND_FLAGS
324 #define SEND_FLAGS 0
325 #endif
326 typedef int socket_t;
327 typedef ssize_t sendsize_t;
328 #endif
329 
330 static void zookeeper_set_sock_nodelay(zhandle_t *, socket_t);
331 static void zookeeper_set_sock_noblock(zhandle_t *, socket_t);
332 static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
333 static socket_t zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
334 
335 /*
336  * return 1 if zh has a SASL client configured, 0 otherwise.
337  */
has_sasl_client(zhandle_t * zh)338 static int has_sasl_client(zhandle_t* zh)
339 {
340 #ifdef HAVE_CYRUS_SASL_H
341     return zh->sasl_client != NULL;
342 #else /* !HAVE_CYRUS_SASL_H */
343     return 0;
344 #endif /* HAVE_CYRUS_SASL_H */
345 }
346 
347 /*
348  * return 1 if zh has a SASL client performing authentication, 0 otherwise.
349  */
is_sasl_auth_in_progress(zhandle_t * zh)350 static int is_sasl_auth_in_progress(zhandle_t* zh)
351 {
352 #ifdef HAVE_CYRUS_SASL_H
353     return zh->sasl_client && zh->sasl_client->state == ZOO_SASL_INTERMEDIATE;
354 #else /* !HAVE_CYRUS_SASL_H */
355     return 0;
356 #endif /* HAVE_CYRUS_SASL_H */
357 }
358 
359 /*
360  * Extract the type field (ZOO_*_OP) of a serialized RequestHeader.
361  *
362  * (This is not the most efficient way of fetching 4 bytes, but it is
363  * currently only used during SASL negotiation.)
364  *
365  * \param buffer the buffer to extract the request type from.  Must
366  *   start with a serialized RequestHeader;
367  * \param len the buffer length.  Must be positive.
368  * \param out_type out parameter; pointer to the location where the
369  *   extracted type is to be stored.  Cannot be NULL.
370  * \return ZOK on success, or < 0 if something went wrong
371  */
extract_request_type(char * buffer,int len,int32_t * out_type)372 static int extract_request_type(char *buffer, int len, int32_t *out_type)
373 {
374     struct iarchive *ia;
375     struct RequestHeader h;
376     int rc;
377 
378     ia = create_buffer_iarchive(buffer, len);
379     rc = ia ? ZOK : ZSYSTEMERROR;
380     rc = rc < 0 ? rc : deserialize_RequestHeader(ia, "header", &h);
381     deallocate_RequestHeader(&h);
382     if (ia) {
383         close_buffer_iarchive(&ia);
384     }
385 
386     *out_type = h.type;
387 
388     return rc;
389 }
390 
391 #ifndef THREADED
392 /*
393  * abort due to the use of a sync api in a singlethreaded environment
394  */
abort_singlethreaded(zhandle_t * zh)395 static void abort_singlethreaded(zhandle_t *zh)
396 {
397     LOG_ERROR(LOGCALLBACK(zh), "Sync completion used without threads");
398     abort();
399 }
400 #endif  /* THREADED */
401 
zookeeper_send(zsock_t * fd,const void * buf,size_t len)402 static ssize_t zookeeper_send(zsock_t *fd, const void* buf, size_t len)
403 {
404 #ifdef HAVE_OPENSSL_H
405     if (fd->ssl_sock)
406         return (ssize_t)SSL_write(fd->ssl_sock, buf, (int)len);
407 #endif
408     return send(fd->sock, buf, len, SEND_FLAGS);
409 }
410 
zookeeper_recv(zsock_t * fd,void * buf,size_t len,int flags)411 static ssize_t zookeeper_recv(zsock_t *fd, void *buf, size_t len, int flags)
412 {
413 #ifdef HAVE_OPENSSL_H
414     if (fd->ssl_sock)
415         return (ssize_t)SSL_read(fd->ssl_sock, buf, (int)len);
416 #endif
417     return recv(fd->sock, buf, len, flags);
418 }
419 
420 /**
421  * Get the system time.
422  *
423  * If the monotonic clock is available, we use that.  The monotonic clock does
424  * not change when the wall-clock time is adjusted by NTP or the system
425  * administrator.  The monotonic clock returns a value which is monotonically
426  * increasing.
427  *
428  * If POSIX monotonic clocks are not available, we fall back on the wall-clock.
429  *
430  * @param tv         (out param) The time.
431  */
get_system_time(struct timeval * tv)432 void get_system_time(struct timeval *tv)
433 {
434   int ret;
435 
436 #ifdef __MACH__ // OS X
437   clock_serv_t cclock;
438   mach_timespec_t mts;
439   ret = host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock);
440   if (!ret) {
441     ret += clock_get_time(cclock, &mts);
442     ret += mach_port_deallocate(mach_task_self(), cclock);
443     if (!ret) {
444       tv->tv_sec = mts.tv_sec;
445       tv->tv_usec = mts.tv_nsec / 1000;
446     }
447   }
448   if (ret) {
449     // Default to gettimeofday in case of failure.
450     ret = gettimeofday(tv, NULL);
451   }
452 #elif defined CLOCK_MONOTONIC_RAW
453   // On Linux, CLOCK_MONOTONIC is affected by ntp slew but CLOCK_MONOTONIC_RAW
454   // is not.  We want the non-slewed (constant rate) CLOCK_MONOTONIC_RAW if it
455   // is available.
456   struct timespec ts = { 0 };
457   ret = clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
458   tv->tv_sec = ts.tv_sec;
459   tv->tv_usec = ts.tv_nsec / 1000;
460 #elif _POSIX_MONOTONIC_CLOCK
461   struct timespec ts = { 0 };
462   ret = clock_gettime(CLOCK_MONOTONIC, &ts);
463   tv->tv_sec = ts.tv_sec;
464   tv->tv_usec = ts.tv_nsec / 1000;
465 #elif _WIN32
466   LARGE_INTEGER counts, countsPerSecond, countsPerMicrosecond;
467   if (QueryPerformanceFrequency(&countsPerSecond) &&
468       QueryPerformanceCounter(&counts)) {
469     countsPerMicrosecond.QuadPart = countsPerSecond.QuadPart / 1000000;
470     tv->tv_sec = (long)(counts.QuadPart / countsPerSecond.QuadPart);
471     tv->tv_usec = (long)((counts.QuadPart % countsPerSecond.QuadPart) /
472         countsPerMicrosecond.QuadPart);
473     ret = 0;
474   } else {
475     ret = gettimeofday(tv, NULL);
476   }
477 #else
478   ret = gettimeofday(tv, NULL);
479 #endif
480   if (ret) {
481     abort();
482   }
483 }
484 
zoo_get_context(zhandle_t * zh)485 const void *zoo_get_context(zhandle_t *zh)
486 {
487     return zh->context;
488 }
489 
zoo_set_context(zhandle_t * zh,void * context)490 void zoo_set_context(zhandle_t *zh, void *context)
491 {
492     if (zh != NULL) {
493         zh->context = context;
494     }
495 }
496 
zoo_recv_timeout(zhandle_t * zh)497 int zoo_recv_timeout(zhandle_t *zh)
498 {
499     return zh->recv_timeout;
500 }
501 
502 /** these functions are thread unsafe, so make sure that
503     zoo_lock_auth is called before you access them **/
get_last_auth(auth_list_head_t * auth_list)504 static auth_info* get_last_auth(auth_list_head_t *auth_list) {
505     auth_info *element;
506     element = auth_list->auth;
507     if (element == NULL) {
508         return NULL;
509     }
510     while (element->next != NULL) {
511         element = element->next;
512     }
513     return element;
514 }
515 
free_auth_completion(auth_completion_list_t * a_list)516 static void free_auth_completion(auth_completion_list_t *a_list) {
517     auth_completion_list_t *tmp, *ftmp;
518     if (a_list == NULL) {
519         return;
520     }
521     tmp = a_list->next;
522     while (tmp != NULL) {
523         ftmp = tmp;
524         tmp = tmp->next;
525         ftmp->completion = NULL;
526         ftmp->auth_data = NULL;
527         free(ftmp);
528     }
529     a_list->completion = NULL;
530     a_list->auth_data = NULL;
531     a_list->next = NULL;
532     return;
533 }
534 
add_auth_completion(auth_completion_list_t * a_list,void_completion_t * completion,const char * data)535 static void add_auth_completion(auth_completion_list_t* a_list, void_completion_t* completion,
536                                 const char *data) {
537     auth_completion_list_t *element;
538     auth_completion_list_t *n_element;
539     element = a_list;
540     if (a_list->completion == NULL) {
541         //this is the first element
542         a_list->completion = *completion;
543         a_list->next = NULL;
544         a_list->auth_data = data;
545         return;
546     }
547     while (element->next != NULL) {
548         element = element->next;
549     }
550     n_element = (auth_completion_list_t*) malloc(sizeof(auth_completion_list_t));
551     n_element->next = NULL;
552     n_element->completion = *completion;
553     n_element->auth_data = data;
554     element->next = n_element;
555     return;
556 }
557 
get_auth_completions(auth_list_head_t * auth_list,auth_completion_list_t * a_list)558 static void get_auth_completions(auth_list_head_t *auth_list, auth_completion_list_t *a_list) {
559     auth_info *element;
560     element = auth_list->auth;
561     if (element == NULL) {
562         return;
563     }
564     while (element) {
565         if (element->completion) {
566             add_auth_completion(a_list, &element->completion, element->data);
567         }
568         element->completion = NULL;
569         element = element->next;
570     }
571     return;
572 }
573 
add_last_auth(auth_list_head_t * auth_list,auth_info * add_el)574 static void add_last_auth(auth_list_head_t *auth_list, auth_info *add_el) {
575     auth_info  *element;
576     element = auth_list->auth;
577     if (element == NULL) {
578         //first element in the list
579         auth_list->auth = add_el;
580         return;
581     }
582     while (element->next != NULL) {
583         element = element->next;
584     }
585     element->next = add_el;
586     return;
587 }
588 
init_auth_info(auth_list_head_t * auth_list)589 static void init_auth_info(auth_list_head_t *auth_list)
590 {
591     auth_list->auth = NULL;
592 }
593 
mark_active_auth(zhandle_t * zh)594 static void mark_active_auth(zhandle_t *zh) {
595     auth_list_head_t auth_h = zh->auth_h;
596     auth_info *element;
597     if (auth_h.auth == NULL) {
598         return;
599     }
600     element = auth_h.auth;
601     while (element != NULL) {
602         element->state = 1;
603         element = element->next;
604     }
605 }
606 
free_auth_info(auth_list_head_t * auth_list)607 static void free_auth_info(auth_list_head_t *auth_list)
608 {
609     auth_info *auth = auth_list->auth;
610     while (auth != NULL) {
611         auth_info* old_auth = NULL;
612         if(auth->scheme!=NULL)
613             free(auth->scheme);
614         deallocate_Buffer(&auth->auth);
615         old_auth = auth;
616         auth = auth->next;
617         free(old_auth);
618     }
619     init_auth_info(auth_list);
620 }
621 
is_unrecoverable(zhandle_t * zh)622 int is_unrecoverable(zhandle_t *zh)
623 {
624     return (zh->state<0)? ZINVALIDSTATE: ZOK;
625 }
626 
exists_result_checker(zhandle_t * zh,int rc)627 zk_hashtable *exists_result_checker(zhandle_t *zh, int rc)
628 {
629     if (rc == ZOK) {
630         return zh->active_node_watchers;
631     } else if (rc == ZNONODE) {
632         return zh->active_exist_watchers;
633     }
634     return 0;
635 }
636 
data_result_checker(zhandle_t * zh,int rc)637 zk_hashtable *data_result_checker(zhandle_t *zh, int rc)
638 {
639     return rc==ZOK ? zh->active_node_watchers : 0;
640 }
641 
child_result_checker(zhandle_t * zh,int rc)642 zk_hashtable *child_result_checker(zhandle_t *zh, int rc)
643 {
644     return rc==ZOK ? zh->active_child_watchers : 0;
645 }
646 
close_zsock(zsock_t * fd)647 void close_zsock(zsock_t *fd)
648 {
649     if (fd->sock != -1) {
650 #ifdef HAVE_OPENSSL_H
651         if (fd->ssl_sock) {
652             SSL_free(fd->ssl_sock);
653             fd->ssl_sock = NULL;
654             SSL_CTX_free(fd->ssl_ctx);
655             fd->ssl_ctx = NULL;
656         }
657 #endif
658         close(fd->sock);
659         fd->sock = -1;
660     }
661 }
662 
663 /**
664  * Frees and closes everything associated with a handle,
665  * including the handle itself.
666  */
destroy(zhandle_t * zh)667 static void destroy(zhandle_t *zh)
668 {
669     if (zh == NULL) {
670         return;
671     }
672     /* call any outstanding completions with a special error code */
673     cleanup_bufs(zh,1,ZCLOSING);
674     if (process_async(zh->outstanding_sync)) {
675         process_completions(zh);
676     }
677     if (zh->hostname != 0) {
678         free(zh->hostname);
679         zh->hostname = NULL;
680     }
681     if (zh->fd->sock != -1) {
682         close_zsock(zh->fd);
683         memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
684         zh->state = 0;
685     }
686     addrvec_free(&zh->addrs);
687 
688     if (zh->chroot != NULL) {
689         free(zh->chroot);
690         zh->chroot = NULL;
691     }
692 #ifdef HAVE_OPENSSL_H
693     if (zh->fd->cert) {
694         free(zh->fd->cert->certstr);
695         free(zh->fd->cert);
696         zh->fd->cert = NULL;
697     }
698 #endif
699     free_auth_info(&zh->auth_h);
700     destroy_zk_hashtable(zh->active_node_watchers);
701     destroy_zk_hashtable(zh->active_exist_watchers);
702     destroy_zk_hashtable(zh->active_child_watchers);
703     addrvec_free(&zh->addrs_old);
704     addrvec_free(&zh->addrs_new);
705 
706 #ifdef HAVE_CYRUS_SASL_H
707     if (zh->sasl_client) {
708         zoo_sasl_client_destroy(zh->sasl_client);
709         free(zh->sasl_client);
710         zh->sasl_client = NULL;
711     }
712 #endif /* HAVE_CYRUS_SASL_H */
713 }
714 
setup_random()715 static void setup_random()
716 {
717 #ifndef _WIN32          // TODO: better seed
718     int seed;
719     int fd = open("/dev/urandom", O_RDONLY);
720     if (fd == -1) {
721         seed = getpid();
722     } else {
723         int seed_len = 0;
724 
725         /* Enter a loop to fill in seed with random data from /dev/urandom.
726          * This is done in a loop so that we can safely handle short reads
727          * which can happen due to signal interruptions.
728          */
729         while (seed_len < sizeof(seed)) {
730             /* Assert we either read something or we were interrupted due to a
731              * signal (errno == EINTR) in which case we need to retry.
732              */
733             int rc = read(fd, &seed + seed_len, sizeof(seed) - seed_len);
734             assert(rc > 0 || errno == EINTR);
735             if (rc > 0) {
736                 seed_len += rc;
737             }
738         }
739         close(fd);
740     }
741     srandom(seed);
742     srand48(seed);
743 #endif
744 }
745 
746 #ifndef __CYGWIN__
747 /**
748  * get the errno from the return code
749  * of get addrinfo. Errno is not set
750  * with the call to getaddrinfo, so thats
751  * why we have to do this.
752  */
getaddrinfo_errno(int rc)753 static int getaddrinfo_errno(int rc) {
754     switch(rc) {
755     case EAI_NONAME:
756 // ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
757 #if defined EAI_NODATA && EAI_NODATA != EAI_NONAME
758     case EAI_NODATA:
759 #endif
760         return ENOENT;
761     case EAI_MEMORY:
762         return ENOMEM;
763     default:
764         return EINVAL;
765     }
766 }
767 #endif
768 
769 /**
770  * Count the number of hosts in the connection host string. This assumes it's
771  * a well-formed connection string whereby each host is separated by a comma.
772  */
count_hosts(char * hosts)773 static int count_hosts(char *hosts)
774 {
775     uint32_t count = 0;
776     char *loc = hosts;
777     if (!hosts || strlen(hosts) == 0) {
778         return 0;
779     }
780 
781     while ((loc = strchr(loc, ','))) {
782         count++;
783         loc+=1;
784     }
785 
786     return count+1;
787 }
788 
789 /**
790  * Resolve hosts and populate provided address vector with shuffled results.
791  * The contents of the provided address vector will be initialized to an
792  * empty state.
793  */
resolve_hosts(const zhandle_t * zh,const char * hosts_in,addrvec_t * avec)794 static int resolve_hosts(const zhandle_t *zh, const char *hosts_in, addrvec_t *avec)
795 {
796     int rc = ZOK;
797     char *host = NULL;
798     char *hosts = NULL;
799     int num_hosts = 0;
800     char *strtok_last = NULL;
801 
802     if (zh == NULL || hosts_in == NULL || avec == NULL) {
803         return ZBADARGUMENTS;
804     }
805 
806     // initialize address vector
807     addrvec_init(avec);
808 
809     hosts = strdup(hosts_in);
810     if (hosts == NULL) {
811         LOG_ERROR(LOGCALLBACK(zh), "out of memory");
812         errno=ENOMEM;
813         rc=ZSYSTEMERROR;
814         goto fail;
815     }
816 
817     num_hosts = count_hosts(hosts);
818     if (num_hosts == 0) {
819         free(hosts);
820         return ZOK;
821     }
822 
823     // Allocate list inside avec
824     rc = addrvec_alloc_capacity(avec, num_hosts);
825     if (rc != 0) {
826         LOG_ERROR(LOGCALLBACK(zh), "out of memory");
827         errno=ENOMEM;
828         rc=ZSYSTEMERROR;
829         goto fail;
830     }
831 
832     host = strtok_r(hosts, ",", &strtok_last);
833     while(host) {
834         char *port_spec = strrchr(host, ':');
835         char *end_port_spec;
836         int port;
837         if (!port_spec) {
838             LOG_ERROR(LOGCALLBACK(zh), "no port in %s", host);
839             errno=EINVAL;
840             rc=ZBADARGUMENTS;
841             goto fail;
842         }
843         *port_spec = '\0';
844         port_spec++;
845         port = strtol(port_spec, &end_port_spec, 0);
846         if (!*port_spec || *end_port_spec || port == 0) {
847             LOG_ERROR(LOGCALLBACK(zh), "invalid port in %s", host);
848             errno=EINVAL;
849             rc=ZBADARGUMENTS;
850             goto fail;
851         }
852 #if defined(__CYGWIN__)
853         // sadly CYGWIN doesn't have getaddrinfo
854         // but happily gethostbyname is threadsafe in windows
855         {
856         struct hostent *he;
857         char **ptr;
858         struct sockaddr_in *addr4;
859 
860         he = gethostbyname(host);
861         if (!he) {
862             LOG_ERROR(LOGCALLBACK(zh), "could not resolve %s", host);
863             errno=ENOENT;
864             rc=ZBADARGUMENTS;
865             goto fail;
866         }
867 
868         // Setup the address array
869         for(ptr = he->h_addr_list;*ptr != 0; ptr++) {
870             if (addrs->count == addrs->capacity) {
871                 rc = addrvec_grow_default(addrs);
872                 if (rc != 0) {
873                     LOG_ERROR(LOGCALLBACK(zh), "out of memory");
874                     errno=ENOMEM;
875                     rc=ZSYSTEMERROR;
876                     goto fail;
877                 }
878             }
879             addr = &addrs->list[addrs->count];
880             addr4 = (struct sockaddr_in*)addr;
881             addr->ss_family = he->h_addrtype;
882             if (addr->ss_family == AF_INET) {
883                 addr4->sin_port = htons(port);
884                 memset(&addr4->sin_zero, 0, sizeof(addr4->sin_zero));
885                 memcpy(&addr4->sin_addr, *ptr, he->h_length);
886                 zh->addrs.count++;
887             }
888 #if defined(AF_INET6)
889             else if (addr->ss_family == AF_INET6) {
890                 struct sockaddr_in6 *addr6;
891 
892                 addr6 = (struct sockaddr_in6*)addr;
893                 addr6->sin6_port = htons(port);
894                 addr6->sin6_scope_id = 0;
895                 addr6->sin6_flowinfo = 0;
896                 memcpy(&addr6->sin6_addr, *ptr, he->h_length);
897                 zh->addrs.count++;
898             }
899 #endif
900             else {
901                 LOG_WARN(LOGCALLBACK(zh), "skipping unknown address family %x for %s",
902                          addr->ss_family, hosts_in);
903             }
904         }
905         host = strtok_r(0, ",", &strtok_last);
906         }
907 #else
908         {
909         struct addrinfo hints, *res, *res0;
910 
911         memset(&hints, 0, sizeof(hints));
912 #ifdef AI_ADDRCONFIG
913         hints.ai_flags = AI_ADDRCONFIG;
914 #else
915         hints.ai_flags = 0;
916 #endif
917         hints.ai_family = AF_UNSPEC;
918         hints.ai_socktype = SOCK_STREAM;
919         hints.ai_protocol = IPPROTO_TCP;
920 
921         while(isspace(*host) && host != strtok_last)
922             host++;
923 
924         if ((rc = getaddrinfo(host, port_spec, &hints, &res0)) != 0) {
925             //bug in getaddrinfo implementation when it returns
926             //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and
927             // ai_flags as AI_ADDRCONFIG
928 #ifdef AI_ADDRCONFIG
929             if ((hints.ai_flags == AI_ADDRCONFIG) &&
930 // ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
931 #ifdef EAI_ADDRFAMILY
932                 ((rc ==EAI_BADFLAGS) || (rc == EAI_ADDRFAMILY))) {
933 #else
934                 (rc == EAI_BADFLAGS)) {
935 #endif
936                 //reset ai_flags to null
937                 hints.ai_flags = 0;
938                 //retry getaddrinfo
939                 rc = getaddrinfo(host, port_spec, &hints, &res0);
940             }
941 #endif
942             if (rc != 0) {
943                 errno = getaddrinfo_errno(rc);
944 #ifdef _WIN32
945                 LOG_ERROR(LOGCALLBACK(zh), "Win32 message: %s\n", gai_strerror(rc));
946 #elif __linux__ && __GNUC__
947                 LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", gai_strerror(rc));
948 #else
949                 LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", strerror(errno));
950 #endif
951                 rc=ZSYSTEMERROR;
952                 goto next;
953             }
954         }
955 
956         for (res = res0; res; res = res->ai_next) {
957             // Expand address list if needed
958             if (avec->count == avec->capacity) {
959                 rc = addrvec_grow_default(avec);
960                 if (rc != 0) {
961                     LOG_ERROR(LOGCALLBACK(zh), "out of memory");
962                     errno=ENOMEM;
963                     rc=ZSYSTEMERROR;
964                     goto fail;
965                 }
966             }
967 
968             // Copy addrinfo into address list
969             switch (res->ai_family) {
970             case AF_INET:
971 #if defined(AF_INET6)
972             case AF_INET6:
973 #endif
974                 addrvec_append_addrinfo(avec, res);
975                 break;
976             default:
977                 LOG_WARN(LOGCALLBACK(zh), "skipping unknown address family %x for %s",
978                           res->ai_family, hosts_in);
979                 break;
980             }
981         }
982 
983         freeaddrinfo(res0);
984 next:
985         host = strtok_r(0, ",", &strtok_last);
986         }
987 #endif
988     }
989     if (avec->count == 0) {
990       rc = ZSYSTEMERROR; // not a single host resolved
991       goto fail;
992     }
993 
994     free(hosts);
995 
996     if(!disable_conn_permute){
997         setup_random();
998         addrvec_shuffle(avec);
999     }
1000 
1001     return ZOK;
1002 
1003 fail:
1004     addrvec_free(avec);
1005 
1006     if (hosts) {
1007         free(hosts);
1008         hosts = NULL;
1009     }
1010 
1011     return rc;
1012 }
1013 
1014 /**
1015  * Updates the list of servers and determine if changing connections is necessary.
1016  * Permutes server list for proper load balancing.
1017  *
1018  * Changing connections is necessary if one of the following holds:
1019  * a) the server this client is currently connected is not in new address list.
1020  *    Otherwise (if currentHost is in the new list):
1021  * b) the number of servers in the cluster is increasing - in this case the load
1022  *    on currentHost should decrease, which means that SOME of the clients
1023  *    connected to it will migrate to the new servers. The decision whether this
1024  *    client migrates or not is probabilistic so that the expected number of
1025  *    clients connected to each server is the same.
1026  *
1027  * If reconfig is set to true, the function sets pOld and pNew that correspond
1028  * to the probability to migrate to ones of the new servers or one of the old
1029  * servers (migrating to one of the old servers is done only if our client's
1030  * currentHost is not in new list).
1031  *
1032  * See zoo_cycle_next_server for the selection logic.
1033  *
1034  * \param ref_time an optional "reference time," used to determine if
1035  * resolution can be skipped in accordance to the delay set by \ref
1036  * zoo_set_servers_resolution_delay.  Passing NULL prevents skipping.
1037  *
1038  * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
1039  * protocol and its evaluation,
1040  */
1041 int update_addrs(zhandle_t *zh, const struct timeval *ref_time)
1042 {
1043     int rc = ZOK;
1044     char *hosts = NULL;
1045     uint32_t num_old = 0;
1046     uint32_t num_new = 0;
1047     uint32_t i = 0;
1048     int found_current = 0;
1049     addrvec_t resolved = { 0 };
1050 
1051     // Verify we have a valid handle
1052     if (zh == NULL) {
1053         return ZBADARGUMENTS;
1054     }
1055 
1056     // zh->hostname should always be set
1057     if (zh->hostname == NULL)
1058     {
1059         return ZSYSTEMERROR;
1060     }
1061 
1062     // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1063     lock_reconfig(zh);
1064 
1065     // Check if we are due for a host name resolution.  (See
1066     // zoo_set_servers_resolution_delay.  The answer is always "yes"
1067     // if no reference is provided or the file descriptor is invalid.)
1068     if (ref_time && zh->fd->sock != -1) {
1069         int do_resolve;
1070 
1071         if (zh->resolve_delay_ms <= 0) {
1072             // -1 disables, 0 means unconditional.  Fail safe.
1073             do_resolve = zh->resolve_delay_ms != -1;
1074         } else {
1075             int elapsed_ms = calculate_interval(&zh->last_resolve, ref_time);
1076             // Include < 0 in case of overflow, or if we are not
1077             // backed by a monotonic clock.
1078             do_resolve = elapsed_ms > zh->resolve_delay_ms || elapsed_ms < 0;
1079         }
1080 
1081         if (!do_resolve) {
1082             goto finish;
1083         }
1084     }
1085 
1086     // Copy zh->hostname for local use
1087     hosts = strdup(zh->hostname);
1088     if (hosts == NULL) {
1089         rc = ZSYSTEMERROR;
1090         goto finish;
1091     }
1092 
1093     rc = resolve_hosts(zh, hosts, &resolved);
1094     if (rc != ZOK)
1095     {
1096         goto finish;
1097     }
1098 
1099     // Unconditionally note last resolution time.
1100     if (ref_time) {
1101         zh->last_resolve = *ref_time;
1102     } else {
1103         get_system_time(&zh->last_resolve);
1104     }
1105 
1106     // If the addrvec list is identical to last time we ran don't do anything
1107     if (addrvec_eq(&zh->addrs, &resolved))
1108     {
1109         goto finish;
1110     }
1111 
1112     // Is the server we're connected to in the new resolved list?
1113     found_current = addrvec_contains(&resolved, &zh->addr_cur);
1114 
1115     // Clear out old and new address lists
1116     zh->reconfig = 1;
1117     addrvec_free(&zh->addrs_old);
1118     addrvec_free(&zh->addrs_new);
1119 
1120     // Divide server list into addrs_old if in previous list and addrs_new if not
1121     for (i = 0; i < resolved.count; i++)
1122     {
1123         struct sockaddr_storage *resolved_address = &resolved.data[i];
1124         if (addrvec_contains(&zh->addrs, resolved_address))
1125         {
1126             rc = addrvec_append(&zh->addrs_old, resolved_address);
1127             if (rc != ZOK)
1128             {
1129                 goto finish;
1130             }
1131         }
1132         else {
1133             rc = addrvec_append(&zh->addrs_new, resolved_address);
1134             if (rc != ZOK)
1135             {
1136                 goto finish;
1137             }
1138         }
1139     }
1140 
1141     num_old = zh->addrs_old.count;
1142     num_new = zh->addrs_new.count;
1143 
1144     // Number of servers increased
1145     if (num_old + num_new > zh->addrs.count)
1146     {
1147         if (found_current) {
1148             // my server is in the new config, but load should be decreased.
1149             // Need to decide if the client is moving to one of the new servers
1150             if (drand48() <= (1 - ((double)zh->addrs.count) / (num_old + num_new))) {
1151                 zh->pNew = 1;
1152                 zh->pOld = 0;
1153             } else {
1154                 // do nothing special -- stay with the current server
1155                 zh->reconfig = 0;
1156             }
1157         } else {
1158             // my server is not in the new config, and load on old servers must
1159             // be decreased, so connect to one of the new servers
1160             zh->pNew = 1;
1161             zh->pOld = 0;
1162         }
1163     }
1164 
1165     // Number of servers stayed the same or decreased
1166     else {
1167         if (found_current) {
1168             // my server is in the new config, and load should be increased, so
1169             // stay with this server and do nothing special
1170             zh->reconfig = 0;
1171         } else {
1172             zh->pOld = ((double) (num_old * (zh->addrs.count - (num_old + num_new)))) / ((num_old + num_new) * (zh->addrs.count - num_old));
1173             zh->pNew = 1 - zh->pOld;
1174         }
1175     }
1176 
1177     addrvec_free(&zh->addrs);
1178     zh->addrs = resolved;
1179 
1180     // If we need to do a reconfig and we're currently connected to a server,
1181     // then force close that connection so on next interest() call we'll make a
1182     // new connection
1183     if (zh->reconfig == 1 && zh->fd->sock != -1)
1184     {
1185         close_zsock(zh->fd);
1186         zh->state = ZOO_NOTCONNECTED_STATE;
1187     }
1188 
1189 finish:
1190 
1191     unlock_reconfig(zh);
1192 
1193     // If we short-circuited out and never assigned resolved to zh->addrs then we
1194     // need to free resolved to avoid a memleak.
1195     if (resolved.data && zh->addrs.data != resolved.data)
1196     {
1197         addrvec_free(&resolved);
1198     }
1199 
1200     if (hosts) {
1201         free(hosts);
1202         hosts = NULL;
1203     }
1204 
1205     return rc;
1206 }
1207 
1208 const clientid_t *zoo_client_id(zhandle_t *zh)
1209 {
1210     return &zh->client_id;
1211 }
1212 
1213 static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4,void*p5){}
1214 
1215 watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn)
1216 {
1217     watcher_fn oldWatcher=zh->watcher;
1218     if (newFn) {
1219        zh->watcher = newFn;
1220     } else {
1221        zh->watcher = null_watcher_fn;
1222     }
1223     return oldWatcher;
1224 }
1225 
1226 struct sockaddr* zookeeper_get_connected_host(zhandle_t *zh,
1227                  struct sockaddr *addr, socklen_t *addr_len)
1228 {
1229     if (zh->state!=ZOO_CONNECTED_STATE) {
1230         return NULL;
1231     }
1232     if (getpeername(zh->fd->sock, addr, addr_len)==-1) {
1233         return NULL;
1234     }
1235     return addr;
1236 }
1237 
1238 static void log_env(zhandle_t *zh) {
1239   char buf[2048];
1240 #ifdef HAVE_SYS_UTSNAME_H
1241   struct utsname utsname;
1242 #endif
1243 
1244 #if defined(HAVE_GETUID) && defined(HAVE_GETPWUID_R)
1245   struct passwd pw;
1246   struct passwd *pwp = NULL;
1247   uid_t uid = 0;
1248 #endif
1249 
1250   LOG_INFO(LOGCALLBACK(zh), "Client environment:zookeeper.version=%s", PACKAGE_STRING);
1251 
1252 #ifdef HAVE_GETHOSTNAME
1253   gethostname(buf, sizeof(buf));
1254   LOG_INFO(LOGCALLBACK(zh), "Client environment:host.name=%s", buf);
1255 #else
1256   LOG_INFO(LOGCALLBACK(zh), "Client environment:host.name=<not implemented>");
1257 #endif
1258 
1259 #ifdef HAVE_SYS_UTSNAME_H
1260   uname(&utsname);
1261   LOG_INFO(LOGCALLBACK(zh), "Client environment:os.name=%s", utsname.sysname);
1262   LOG_INFO(LOGCALLBACK(zh), "Client environment:os.arch=%s", utsname.release);
1263   LOG_INFO(LOGCALLBACK(zh), "Client environment:os.version=%s", utsname.version);
1264 #else
1265   LOG_INFO(LOGCALLBACK(zh), "Client environment:os.name=<not implemented>");
1266   LOG_INFO(LOGCALLBACK(zh), "Client environment:os.arch=<not implemented>");
1267   LOG_INFO(LOGCALLBACK(zh), "Client environment:os.version=<not implemented>");
1268 #endif
1269 
1270 #ifdef HAVE_GETLOGIN
1271   LOG_INFO(LOGCALLBACK(zh), "Client environment:user.name=%s", getlogin());
1272 #else
1273   LOG_INFO(LOGCALLBACK(zh), "Client environment:user.name=<not implemented>");
1274 #endif
1275 
1276 #if defined(HAVE_GETUID) && defined(HAVE_GETPWUID_R)
1277   uid = getuid();
1278   if (!getpwuid_r(uid, &pw, buf, sizeof(buf), &pwp) && pwp) {
1279     LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=%s", pw.pw_dir);
1280   } else {
1281     LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=<NA>");
1282   }
1283 #else
1284   LOG_INFO(LOGCALLBACK(zh), "Client environment:user.home=<not implemented>");
1285 #endif
1286 
1287 #ifdef HAVE_GETCWD
1288   if (!getcwd(buf, sizeof(buf))) {
1289     LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=<toolong>");
1290   } else {
1291     LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=%s", buf);
1292   }
1293 #else
1294   LOG_INFO(LOGCALLBACK(zh), "Client environment:user.dir=<not implemented>");
1295 #endif
1296 }
1297 
1298 /**
1299  * Create a zookeeper handle associated with the given host and port.
1300  */
1301 static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
1302         int recv_timeout, const clientid_t *clientid, void *context, int flags,
1303         log_callback_fn log_callback, zcert_t *cert, void *sasl_params)
1304 {
1305     int errnosave = 0;
1306     zhandle_t *zh = NULL;
1307     char *index_chroot = NULL;
1308 
1309     // Create our handle
1310     zh = calloc(1, sizeof(*zh));
1311     if (!zh) {
1312         return 0;
1313     }
1314 
1315     // Set log callback before calling into log_env
1316     zh->log_callback = log_callback;
1317 
1318     if (!(flags & ZOO_NO_LOG_CLIENTENV)) {
1319         log_env(zh);
1320     }
1321 
1322     zh->fd = calloc(1, sizeof(zsock_t));
1323     zh->fd->sock = -1;
1324     if (cert) {
1325         zh->fd->cert = calloc(1, sizeof(zcert_t));
1326         memcpy(zh->fd->cert, cert, sizeof(zcert_t));
1327     }
1328 
1329 #ifdef _WIN32
1330     if (Win32WSAStartup()){
1331         LOG_ERROR(LOGCALLBACK(zh), "Error initializing ws2_32.dll");
1332         return 0;
1333     }
1334 #endif
1335     LOG_INFO(LOGCALLBACK(zh), "Initiating client connection, host=%s sessionTimeout=%d watcher=%p"
1336           " sessionId=%#llx sessionPasswd=%s context=%p flags=%d",
1337               host,
1338               recv_timeout,
1339               watcher,
1340               (clientid == 0 ? 0 : clientid->client_id),
1341               ((clientid == 0) || (clientid->passwd[0] == 0) ?
1342                "<null>" : "<hidden>"),
1343               context,
1344               flags);
1345 
1346     zh->hostname = NULL;
1347     zh->state = ZOO_NOTCONNECTED_STATE;
1348     zh->context = context;
1349     zh->recv_timeout = recv_timeout;
1350     zh->allow_read_only = flags & ZOO_READONLY;
1351     // non-zero clientid implies we've seen r/w server already
1352     zh->seen_rw_server_before = (clientid != 0 && clientid->client_id != 0);
1353     init_auth_info(&zh->auth_h);
1354     if (watcher) {
1355        zh->watcher = watcher;
1356     } else {
1357        zh->watcher = null_watcher_fn;
1358     }
1359     if (host == 0 || *host == 0) { // what we shouldn't dup
1360         errno=EINVAL;
1361         goto abort;
1362     }
1363     //parse the host to get the chroot if available
1364     index_chroot = strchr(host, '/');
1365     if (index_chroot) {
1366         zh->chroot = strdup(index_chroot);
1367         if (zh->chroot == NULL) {
1368             goto abort;
1369         }
1370         // if chroot is just / set it to null
1371         if (strlen(zh->chroot) == 1) {
1372             free(zh->chroot);
1373             zh->chroot = NULL;
1374         }
1375         // cannot use strndup so allocate and strcpy
1376         zh->hostname = (char *) malloc(index_chroot - host + 1);
1377         zh->hostname = strncpy(zh->hostname, host, (index_chroot - host));
1378         //strncpy does not null terminate
1379         *(zh->hostname + (index_chroot - host)) = '\0';
1380 
1381     } else {
1382         zh->chroot = NULL;
1383         zh->hostname = strdup(host);
1384     }
1385     if (zh->chroot && !isValidPath(zh->chroot, 0)) {
1386         errno = EINVAL;
1387         goto abort;
1388     }
1389     if (zh->hostname == 0) {
1390         goto abort;
1391     }
1392     if(update_addrs(zh, NULL) != 0) {
1393         goto abort;
1394     }
1395 
1396     if (clientid) {
1397         memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
1398     } else {
1399         memset(&zh->client_id, 0, sizeof(zh->client_id));
1400     }
1401     zh->io_count = 0;
1402     zh->primer_buffer.buffer = zh->primer_storage_buffer;
1403     zh->primer_buffer.curr_offset = 0;
1404     zh->primer_buffer.len = sizeof(zh->primer_storage_buffer);
1405     zh->primer_buffer.next = 0;
1406     zh->last_zxid = 0;
1407     zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0;
1408     zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
1409     zh->active_node_watchers=create_zk_hashtable();
1410     zh->active_exist_watchers=create_zk_hashtable();
1411     zh->active_child_watchers=create_zk_hashtable();
1412     zh->disable_reconnection_attempt = 0;
1413 
1414 #ifdef HAVE_CYRUS_SASL_H
1415     if (sasl_params) {
1416         zh->sasl_client = zoo_sasl_client_create(
1417             (zoo_sasl_params_t*)sasl_params);
1418         if (!zh->sasl_client) {
1419             goto abort;
1420         }
1421     }
1422 #endif /* HAVE_CYRUS_SASL_H */
1423 
1424     if (adaptor_init(zh) == -1) {
1425         goto abort;
1426     }
1427 
1428     return zh;
1429 abort:
1430     errnosave=errno;
1431     destroy(zh);
1432     free(zh->fd);
1433     free(zh);
1434     errno=errnosave;
1435     return 0;
1436 }
1437 
1438 zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
1439         int recv_timeout, const clientid_t *clientid, void *context, int flags)
1440 {
1441     return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, NULL, NULL);
1442 }
1443 
1444 zhandle_t *zookeeper_init2(const char *host, watcher_fn watcher,
1445         int recv_timeout, const clientid_t *clientid, void *context, int flags,
1446         log_callback_fn log_callback)
1447 {
1448     return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL, NULL);
1449 }
1450 
1451 #ifdef HAVE_OPENSSL_H
1452 zhandle_t *zookeeper_init_ssl(const char *host, const char *cert, watcher_fn watcher,
1453         int recv_timeout, const clientid_t *clientid, void *context, int flags)
1454 {
1455     zcert_t zcert;
1456     zcert.certstr = strdup(cert);
1457     zcert.ca = strtok(strdup(cert), ",");
1458     zcert.cert = strtok(NULL, ",");
1459     zcert.key = strtok(NULL, ",");
1460     zcert.passwd = strtok(NULL, ",");
1461     return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, &zcert, NULL);
1462 }
1463 #endif
1464 
1465 #ifdef HAVE_CYRUS_SASL_H
1466 zhandle_t *zookeeper_init_sasl(const char *host, watcher_fn watcher,
1467         int recv_timeout, const clientid_t *clientid, void *context, int flags,
1468         log_callback_fn log_callback, zoo_sasl_params_t *sasl_params)
1469 {
1470     return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL, sasl_params);
1471 }
1472 #endif /* HAVE_CYRUS_SASL_H */
1473 
1474 /**
1475  * Set a new list of zk servers to connect to.  Disconnect will occur if
1476  * current connection endpoint is not in the list.
1477  */
1478 int zoo_set_servers(zhandle_t *zh, const char *hosts)
1479 {
1480     if (hosts == NULL)
1481     {
1482         LOG_ERROR(LOGCALLBACK(zh), "New server list cannot be empty");
1483         return ZBADARGUMENTS;
1484     }
1485 
1486     // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1487     lock_reconfig(zh);
1488 
1489     // Reset hostname to new set of hosts to connect to
1490     if (zh->hostname) {
1491         free(zh->hostname);
1492     }
1493 
1494     zh->hostname = strdup(hosts);
1495 
1496     unlock_reconfig(zh);
1497 
1498     return update_addrs(zh, NULL);
1499 }
1500 
1501 /*
1502  * Sets a minimum delay to observe between "routine" host name
1503  * resolutions.  See prototype for full documentation.
1504  */
1505 int zoo_set_servers_resolution_delay(zhandle_t *zh, int delay_ms) {
1506     if (delay_ms < -1) {
1507         LOG_ERROR(LOGCALLBACK(zh), "Resolution delay cannot be %d", delay_ms);
1508         return ZBADARGUMENTS;
1509     }
1510 
1511     // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1512     lock_reconfig(zh);
1513 
1514     zh->resolve_delay_ms = delay_ms;
1515 
1516     unlock_reconfig(zh);
1517 
1518     return ZOK;
1519 }
1520 
1521 /**
1522  * Get the next server to connect to, when in 'reconfig' mode, which means that
1523  * we've updated the server list to connect to, and are now trying to find some
1524  * server to connect to. Once we get successfully connected, 'reconfig' mode is
1525  * set to false. Similarly, if we tried to connect to all servers in new config
1526  * and failed, 'reconfig' mode is set to false.
1527  *
1528  * While in 'reconfig' mode, we should connect to a server in the new set of
1529  * servers (addrs_new) with probability pNew and to servers in the old set of
1530  * servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
1531  * out all servers in either, we continue to try servers from the other set,
1532  * regardless of pNew or pOld. If we tried all servers we give up and go back to
1533  * the normal round robin mode
1534  *
1535  * When called, must be protected by lock_reconfig(zh).
1536  */
1537 static int get_next_server_in_reconfig(zhandle_t *zh)
1538 {
1539     int take_new = drand48() <= zh->pNew;
1540 
1541     LOG_DEBUG(LOGCALLBACK(zh), "[OLD] count=%d capacity=%d next=%d hasnext=%d",
1542                zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
1543                addrvec_hasnext(&zh->addrs_old));
1544     LOG_DEBUG(LOGCALLBACK(zh), "[NEW] count=%d capacity=%d next=%d hasnext=%d",
1545                zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
1546                addrvec_hasnext(&zh->addrs_new));
1547 
1548     // Take one of the new servers if we haven't tried them all yet
1549     // and either the probability tells us to connect to one of the new servers
1550     // or if we already tried them all then use one of the old servers
1551     if (addrvec_hasnext(&zh->addrs_new)
1552             && (take_new || !addrvec_hasnext(&zh->addrs_old)))
1553     {
1554         addrvec_next(&zh->addrs_new, &zh->addr_cur);
1555         LOG_DEBUG(LOGCALLBACK(zh), "Using next from NEW=%s", format_endpoint_info(&zh->addr_cur));
1556         return 0;
1557     }
1558 
1559     // start taking old servers
1560     if (addrvec_hasnext(&zh->addrs_old)) {
1561         addrvec_next(&zh->addrs_old, &zh->addr_cur);
1562         LOG_DEBUG(LOGCALLBACK(zh), "Using next from OLD=%s", format_endpoint_info(&zh->addr_cur));
1563         return 0;
1564     }
1565 
1566     LOG_DEBUG(LOGCALLBACK(zh), "Failed to find either new or old");
1567     memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
1568     return 1;
1569 }
1570 
1571 /**
1572  * Cycle through our server list to the correct 'next' server. The 'next' server
1573  * to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
1574  * mode means we've upated the server list and are now trying to find a server
1575  * to connect to. Once we get connected, we are no longer in the reconfig mode.
1576  * Similarly, if we try to connect to all the servers in the new configuration
1577  * and failed, reconfig mode is set to false.
1578  *
1579  * For more algorithm details, see get_next_server_in_reconfig.
1580  */
1581 void zoo_cycle_next_server(zhandle_t *zh)
1582 {
1583     // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1584     lock_reconfig(zh);
1585 
1586     memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
1587 
1588     if (zh->reconfig)
1589     {
1590         if (get_next_server_in_reconfig(zh) == 0) {
1591             unlock_reconfig(zh);
1592             return;
1593         }
1594 
1595         // tried all new and old servers and couldn't connect
1596         zh->reconfig = 0;
1597     }
1598 
1599     addrvec_next(&zh->addrs, &zh->addr_cur);
1600 
1601     unlock_reconfig(zh);
1602 
1603     return;
1604 }
1605 
1606 /**
1607  * Get the host:port for the server we are currently connecting to or connected
1608  * to. This is largely for testing purposes but is also generally useful for
1609  * other client software built on top of this client.
1610  */
1611 const char* zoo_get_current_server(zhandle_t* zh)
1612 {
1613     const char *endpoint_info = NULL;
1614 
1615     // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new, last_resolve, resolve_delay_ms}
1616     // Need the lock here as it is changed in update_addrs()
1617     lock_reconfig(zh);
1618 
1619     endpoint_info = format_endpoint_info(&zh->addr_cur);
1620     unlock_reconfig(zh);
1621     return endpoint_info;
1622 }
1623 
1624 /**
1625  * deallocated the free_path only its beeen allocated
1626  * and not equal to path
1627  */
1628 void free_duplicate_path(const char *free_path, const char* path) {
1629     if (free_path != path) {
1630         free((void*)free_path);
1631     }
1632 }
1633 
1634 /**
1635   prepend the chroot path if available else return the path
1636 */
1637 static char* prepend_string(zhandle_t *zh, const char* client_path) {
1638     char *ret_str;
1639     if (zh == NULL || zh->chroot == NULL)
1640         return (char *) client_path;
1641     // handle the chroot itself, client_path = "/"
1642     if (strlen(client_path) == 1) {
1643         return strdup(zh->chroot);
1644     }
1645     ret_str = (char *) malloc(strlen(zh->chroot) + strlen(client_path) + 1);
1646     strcpy(ret_str, zh->chroot);
1647     return strcat(ret_str, client_path);
1648 }
1649 
1650 /**
1651    strip off the chroot string from the server path
1652    if there is one else return the exact path
1653  */
1654 char* sub_string(zhandle_t *zh, const char* server_path) {
1655     char *ret_str;
1656     if (zh->chroot == NULL)
1657         return (char *) server_path;
1658     //ZOOKEEPER-1027
1659     if (strncmp(server_path, zh->chroot, strlen(zh->chroot)) != 0) {
1660         LOG_ERROR(LOGCALLBACK(zh), "server path %s does not include chroot path %s",
1661                    server_path, zh->chroot);
1662         return (char *) server_path;
1663     }
1664     if (strlen(server_path) == strlen(zh->chroot)) {
1665         //return "/"
1666         ret_str = strdup("/");
1667         return ret_str;
1668     }
1669     ret_str = strdup(server_path + strlen(zh->chroot));
1670     return ret_str;
1671 }
1672 
1673 static buffer_list_t *allocate_buffer(char *buff, int len)
1674 {
1675     buffer_list_t *buffer = calloc(1, sizeof(*buffer));
1676     if (buffer == 0)
1677         return 0;
1678 
1679     buffer->len = len==0?sizeof(*buffer):len;
1680     buffer->curr_offset = 0;
1681     buffer->buffer = buff;
1682     buffer->next = 0;
1683     return buffer;
1684 }
1685 
1686 static void free_buffer(buffer_list_t *b)
1687 {
1688     if (!b) {
1689         return;
1690     }
1691     if (b->buffer) {
1692         free(b->buffer);
1693     }
1694     free(b);
1695 }
1696 
1697 static buffer_list_t *dequeue_buffer(buffer_head_t *list)
1698 {
1699     buffer_list_t *b;
1700     lock_buffer_list(list);
1701     b = list->head;
1702     if (b) {
1703         list->head = b->next;
1704         if (!list->head) {
1705             assert(b == list->last);
1706             list->last = 0;
1707         }
1708     }
1709     unlock_buffer_list(list);
1710     return b;
1711 }
1712 
1713 static int remove_buffer(buffer_head_t *list)
1714 {
1715     buffer_list_t *b = dequeue_buffer(list);
1716     if (!b) {
1717         return 0;
1718     }
1719     free_buffer(b);
1720     return 1;
1721 }
1722 
1723 static void queue_buffer(buffer_head_t *list, buffer_list_t *b, int add_to_front)
1724 {
1725     b->next = 0;
1726     lock_buffer_list(list);
1727     if (list->head) {
1728         assert(list->last);
1729         // The list is not empty
1730         if (add_to_front) {
1731             b->next = list->head;
1732             list->head = b;
1733         } else {
1734             list->last->next = b;
1735             list->last = b;
1736         }
1737     }else{
1738         // The list is empty
1739         assert(!list->head);
1740         list->head = b;
1741         list->last = b;
1742     }
1743     unlock_buffer_list(list);
1744 }
1745 
1746 static int queue_buffer_bytes(buffer_head_t *list, char *buff, int len)
1747 {
1748     buffer_list_t *b  = allocate_buffer(buff,len);
1749     if (!b)
1750         return ZSYSTEMERROR;
1751     queue_buffer(list, b, 0);
1752     return ZOK;
1753 }
1754 
1755 static int queue_front_buffer_bytes(buffer_head_t *list, char *buff, int len)
1756 {
1757     buffer_list_t *b  = allocate_buffer(buff,len);
1758     if (!b)
1759         return ZSYSTEMERROR;
1760     queue_buffer(list, b, 1);
1761     return ZOK;
1762 }
1763 
1764 static __attribute__ ((unused)) int get_queue_len(buffer_head_t *list)
1765 {
1766     int i;
1767     buffer_list_t *ptr;
1768     lock_buffer_list(list);
1769     ptr = list->head;
1770     for (i=0; ptr!=0; ptr=ptr->next, i++)
1771         ;
1772     unlock_buffer_list(list);
1773     return i;
1774 }
1775 /* returns:
1776  * -1 if send failed,
1777  * 0 if send would block while sending the buffer (or a send was incomplete),
1778  * 1 if success
1779  */
1780 static int send_buffer(zhandle_t *zh, buffer_list_t *buff)
1781 {
1782     int len = buff->len;
1783     int off = buff->curr_offset;
1784     int rc = -1;
1785 
1786     if (off < 4) {
1787         /* we need to send the length at the beginning */
1788         int nlen = htonl(len);
1789         char *b = (char*)&nlen;
1790         rc = zookeeper_send(zh->fd, b + off, sizeof(nlen) - off);
1791         if (rc == -1) {
1792 #ifdef _WIN32
1793             if (WSAGetLastError() != WSAEWOULDBLOCK) {
1794 #else
1795             if (errno != EAGAIN) {
1796 #endif
1797                 return -1;
1798             } else {
1799                 return 0;
1800             }
1801         } else {
1802             buff->curr_offset  += rc;
1803         }
1804         off = buff->curr_offset;
1805     }
1806     if (off >= 4) {
1807         /* want off to now represent the offset into the buffer */
1808         off -= sizeof(buff->len);
1809         rc = zookeeper_send(zh->fd, buff->buffer + off, len - off);
1810         if (rc == -1) {
1811 #ifdef _WIN32
1812             if (WSAGetLastError() != WSAEWOULDBLOCK) {
1813 #else
1814             if (errno != EAGAIN) {
1815 #endif
1816                 return -1;
1817             }
1818         } else {
1819             buff->curr_offset += rc;
1820         }
1821     }
1822     return buff->curr_offset == len + sizeof(buff->len);
1823 }
1824 
1825 /* returns:
1826  * -1 if recv call failed,
1827  * 0 if recv would block,
1828  * 1 if success
1829  */
1830 static int recv_buffer(zhandle_t *zh, buffer_list_t *buff)
1831 {
1832     int off = buff->curr_offset;
1833     int rc = 0;
1834 
1835     /* if buffer is less than 4, we are reading in the length */
1836     if (off < 4) {
1837         char *buffer = (char*)&(buff->len);
1838         rc = zookeeper_recv(zh->fd, buffer+off, sizeof(int)-off, 0);
1839         switch (rc) {
1840         case 0:
1841             errno = EHOSTDOWN;
1842         case -1:
1843 #ifdef _WIN32
1844             if (WSAGetLastError() == WSAEWOULDBLOCK) {
1845 #else
1846             if (errno == EAGAIN) {
1847 #endif
1848                 return 0;
1849             }
1850             return -1;
1851         default:
1852             buff->curr_offset += rc;
1853         }
1854         off = buff->curr_offset;
1855         if (buff->curr_offset == sizeof(buff->len)) {
1856             buff->len = ntohl(buff->len);
1857             buff->buffer = calloc(1, buff->len);
1858         }
1859     }
1860     if (buff->buffer) {
1861         /* want off to now represent the offset into the buffer */
1862         off -= sizeof(buff->len);
1863 
1864         rc = zookeeper_recv(zh->fd, buff->buffer+off, buff->len-off, 0);
1865 
1866         /* dirty hack to make new client work against old server
1867          * old server sends 40 bytes to finish connection handshake,
1868          * while we're expecting 41 (1 byte for read-only mode data) */
1869         if (rc > 0 && buff == &zh->primer_buffer) {
1870             /* primer_buffer's curr_offset starts at 4 (see prime_connection) */
1871             int avail = buff->curr_offset - sizeof(buff->len) + rc;
1872 
1873             /* exactly 40 bytes (out of 41 expected) collected? */
1874             if (avail == buff->len - 1) {
1875                 int32_t reply_len;
1876 
1877                 /* extract length of ConnectResponse (+ 1-byte flag?) */
1878                 memcpy(&reply_len, buff->buffer, sizeof(reply_len));
1879                 reply_len = ntohl(reply_len);
1880 
1881                 /* if 1-byte flag was not sent, fake it (value 0) */
1882                 if ((int)(reply_len + sizeof(reply_len)) == buff->len - 1) {
1883                     ++rc;
1884                 }
1885             }
1886         }
1887 
1888         switch(rc) {
1889         case 0:
1890             errno = EHOSTDOWN;
1891         case -1:
1892 #ifdef _WIN32
1893             if (WSAGetLastError() == WSAEWOULDBLOCK) {
1894 #else
1895             if (errno == EAGAIN) {
1896 #endif
1897                 break;
1898             }
1899             return -1;
1900         default:
1901             buff->curr_offset += rc;
1902         }
1903     }
1904     return buff->curr_offset == buff->len + sizeof(buff->len);
1905 }
1906 
1907 void free_buffers(buffer_head_t *list)
1908 {
1909     while (remove_buffer(list))
1910         ;
1911 }
1912 
1913 void free_completions(zhandle_t *zh,int callCompletion,int reason)
1914 {
1915     completion_head_t tmp_list;
1916     struct oarchive *oa;
1917     struct ReplyHeader h;
1918     void_completion_t auth_completion = NULL;
1919     auth_completion_list_t a_list, *a_tmp;
1920 
1921     if (lock_completion_list(&zh->sent_requests) == 0) {
1922         tmp_list = zh->sent_requests;
1923         zh->sent_requests.head = 0;
1924         zh->sent_requests.last = 0;
1925         unlock_completion_list(&zh->sent_requests);
1926         while (tmp_list.head) {
1927             completion_list_t *cptr = tmp_list.head;
1928 
1929             tmp_list.head = cptr->next;
1930             if (cptr->c.data_result == SYNCHRONOUS_MARKER) {
1931 #ifdef THREADED
1932                 struct sync_completion
1933                             *sc = (struct sync_completion*)cptr->data;
1934                 sc->rc = reason;
1935                 notify_sync_completion(sc);
1936                 zh->outstanding_sync--;
1937                 destroy_completion_entry(cptr);
1938 #else
1939                 abort_singlethreaded(zh);
1940 #endif
1941             } else if (callCompletion) {
1942                 // Fake the response
1943                 buffer_list_t *bptr;
1944                 h.xid = cptr->xid;
1945                 h.zxid = -1;
1946                 h.err = reason;
1947                 oa = create_buffer_oarchive();
1948                 serialize_ReplyHeader(oa, "header", &h);
1949                 bptr = calloc(sizeof(*bptr), 1);
1950                 assert(bptr);
1951                 bptr->len = get_buffer_len(oa);
1952                 bptr->buffer = get_buffer(oa);
1953                 close_buffer_oarchive(&oa, 0);
1954                 cptr->buffer = bptr;
1955                 queue_completion(&zh->completions_to_process, cptr, 0);
1956             }
1957         }
1958     }
1959 
1960     zoo_lock_auth(zh);
1961     a_list.completion = NULL;
1962     a_list.next = NULL;
1963     get_auth_completions(&zh->auth_h, &a_list);
1964     zoo_unlock_auth(zh);
1965 
1966     a_tmp = &a_list;
1967     // chain call user's completion function
1968     while (a_tmp->completion != NULL) {
1969         auth_completion = a_tmp->completion;
1970         auth_completion(reason, a_tmp->auth_data);
1971         a_tmp = a_tmp->next;
1972         if (a_tmp == NULL)
1973             break;
1974     }
1975 
1976     free_auth_completion(&a_list);
1977 }
1978 
1979 static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc)
1980 {
1981     enter_critical(zh);
1982     free_buffers(&zh->to_send);
1983     free_buffers(&zh->to_process);
1984     free_completions(zh,callCompletion,rc);
1985     leave_critical(zh);
1986     if (zh->input_buffer && zh->input_buffer != &zh->primer_buffer) {
1987         free_buffer(zh->input_buffer);
1988         zh->input_buffer = 0;
1989     }
1990 }
1991 
1992 /* return 1 if zh's state is ZOO_CONNECTED_STATE or ZOO_READONLY_STATE,
1993  * 0 otherwise */
1994 static int is_connected(zhandle_t* zh)
1995 {
1996     return (zh->state==ZOO_CONNECTED_STATE || zh->state==ZOO_READONLY_STATE);
1997 }
1998 
1999 static void cleanup(zhandle_t *zh,int rc)
2000 {
2001     close_zsock(zh->fd);
2002     if (is_unrecoverable(zh)) {
2003         LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
2004                 state2String(zh->state));
2005         PROCESS_SESSION_EVENT(zh, zh->state);
2006     } else if (is_connected(zh)) {
2007         LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE");
2008         PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
2009     }
2010     cleanup_bufs(zh,1,rc);
2011 
2012     LOG_DEBUG(LOGCALLBACK(zh), "Previous connection=%s delay=%d", zoo_get_current_server(zh), zh->delay);
2013 
2014     if (!is_unrecoverable(zh)) {
2015         zh->state = 0;
2016     }
2017     if (process_async(zh->outstanding_sync)) {
2018         process_completions(zh);
2019     }
2020 }
2021 
2022 static void handle_error(zhandle_t *zh,int rc)
2023 {
2024     cleanup(zh, rc);
2025     // NOTE: If we're at the end of the list of addresses to connect to, then
2026     // we want to delay the next connection attempt to avoid spinning.
2027     // Then increment what host we'll connect to since we failed to connect to current
2028     zh->delay = addrvec_atend(&zh->addrs);
2029     addrvec_next(&zh->addrs, &zh->addr_cur);
2030 }
2031 
2032 static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
2033         const char* format, ...)
2034 {
2035     if(logLevel>=ZOO_LOG_LEVEL_ERROR){
2036         va_list va;
2037         char buf[1024];
2038         va_start(va,format);
2039         vsnprintf(buf, sizeof(buf)-1,format,va);
2040         log_message(LOGCALLBACK(zh), ZOO_LOG_LEVEL_ERROR,line,__func__,
2041             "Socket %s zk retcode=%d, errno=%d(%s): %s",
2042             zoo_get_current_server(zh),rc,errno,strerror(errno),buf);
2043         va_end(va);
2044     }
2045     handle_error(zh,rc);
2046     return rc;
2047 }
2048 
2049 static void auth_completion_func(int rc, zhandle_t* zh)
2050 {
2051     void_completion_t auth_completion = NULL;
2052     auth_completion_list_t a_list;
2053     auth_completion_list_t *a_tmp;
2054 
2055     if(zh==NULL)
2056         return;
2057 
2058     zoo_lock_auth(zh);
2059 
2060     if(rc!=0){
2061         zh->state=ZOO_AUTH_FAILED_STATE;
2062     }else{
2063         //change state for all auths
2064         mark_active_auth(zh);
2065     }
2066     a_list.completion = NULL;
2067     a_list.next = NULL;
2068     get_auth_completions(&zh->auth_h, &a_list);
2069     zoo_unlock_auth(zh);
2070     if (rc) {
2071         LOG_ERROR(LOGCALLBACK(zh), "Authentication scheme %s failed. Connection closed.",
2072                    zh->auth_h.auth->scheme);
2073     }
2074     else {
2075         LOG_INFO(LOGCALLBACK(zh), "Authentication scheme %s succeeded", zh->auth_h.auth->scheme);
2076     }
2077     a_tmp = &a_list;
2078     // chain call user's completion function
2079     while (a_tmp->completion != NULL) {
2080         auth_completion = a_tmp->completion;
2081         auth_completion(rc, a_tmp->auth_data);
2082         a_tmp = a_tmp->next;
2083         if (a_tmp == NULL)
2084             break;
2085     }
2086     free_auth_completion(&a_list);
2087 }
2088 
2089 static int send_info_packet(zhandle_t *zh, auth_info* auth) {
2090     struct oarchive *oa;
2091     struct RequestHeader h = {AUTH_XID, ZOO_SETAUTH_OP};
2092     struct AuthPacket req;
2093     int rc;
2094     oa = create_buffer_oarchive();
2095     rc = serialize_RequestHeader(oa, "header", &h);
2096     req.type=0;   // ignored by the server
2097     req.scheme = auth->scheme;
2098     req.auth = auth->auth;
2099     rc = rc < 0 ? rc : serialize_AuthPacket(oa, "req", &req);
2100     /* add this buffer to the head of the send queue */
2101     rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
2102             get_buffer_len(oa));
2103     /* We queued the buffer, so don't free it */
2104     close_buffer_oarchive(&oa, 0);
2105 
2106     return rc;
2107 }
2108 
2109 /** send all auths, not just the last one **/
2110 static int send_auth_info(zhandle_t *zh) {
2111     int rc = 0;
2112     auth_info *auth = NULL;
2113 
2114     zoo_lock_auth(zh);
2115     auth = zh->auth_h.auth;
2116     if (auth == NULL) {
2117         zoo_unlock_auth(zh);
2118         return ZOK;
2119     }
2120     while (auth != NULL) {
2121         rc = send_info_packet(zh, auth);
2122         auth = auth->next;
2123     }
2124     zoo_unlock_auth(zh);
2125     LOG_DEBUG(LOGCALLBACK(zh), "Sending all auth info request to %s", zoo_get_current_server(zh));
2126     return (rc <0) ? ZMARSHALLINGERROR:ZOK;
2127 }
2128 
2129 static int send_last_auth_info(zhandle_t *zh)
2130 {
2131     int rc = 0;
2132     auth_info *auth = NULL;
2133 
2134     zoo_lock_auth(zh);
2135     auth = get_last_auth(&zh->auth_h);
2136     if(auth==NULL) {
2137       zoo_unlock_auth(zh);
2138       return ZOK; // there is nothing to send
2139     }
2140     rc = send_info_packet(zh, auth);
2141     zoo_unlock_auth(zh);
2142     LOG_DEBUG(LOGCALLBACK(zh), "Sending auth info request to %s",zoo_get_current_server(zh));
2143     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
2144 }
2145 
2146 static void free_key_list(char **list, int count)
2147 {
2148     int i;
2149 
2150     for(i = 0; i < count; i++) {
2151         free(list[i]);
2152     }
2153     free(list);
2154 }
2155 
2156 static int send_set_watches(zhandle_t *zh)
2157 {
2158     struct oarchive *oa;
2159     struct RequestHeader h = {SET_WATCHES_XID, ZOO_SETWATCHES_OP};
2160     struct SetWatches req;
2161     int rc;
2162 
2163     req.relativeZxid = zh->last_zxid;
2164     lock_watchers(zh);
2165     req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
2166     req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
2167     req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);
2168     unlock_watchers(zh);
2169 
2170     // return if there are no pending watches
2171     if (!req.dataWatches.count && !req.existWatches.count &&
2172         !req.childWatches.count) {
2173         free_key_list(req.dataWatches.data, req.dataWatches.count);
2174         free_key_list(req.existWatches.data, req.existWatches.count);
2175         free_key_list(req.childWatches.data, req.childWatches.count);
2176         return ZOK;
2177     }
2178 
2179 
2180     oa = create_buffer_oarchive();
2181     rc = serialize_RequestHeader(oa, "header", &h);
2182     rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req);
2183     /* add this buffer to the head of the send queue */
2184     rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
2185             get_buffer_len(oa));
2186     /* We queued the buffer, so don't free it */
2187     close_buffer_oarchive(&oa, 0);
2188     free_key_list(req.dataWatches.data, req.dataWatches.count);
2189     free_key_list(req.existWatches.data, req.existWatches.count);
2190     free_key_list(req.childWatches.data, req.childWatches.count);
2191     LOG_DEBUG(LOGCALLBACK(zh), "Sending set watches request to %s",zoo_get_current_server(zh));
2192     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
2193 }
2194 
2195 static int serialize_prime_connect(struct connect_req *req, char* buffer){
2196     //this should be the order of serialization
2197     int offset = 0;
2198     req->protocolVersion = htonl(req->protocolVersion);
2199     memcpy(buffer + offset, &req->protocolVersion, sizeof(req->protocolVersion));
2200     offset = offset +  sizeof(req->protocolVersion);
2201 
2202     req->lastZxidSeen = zoo_htonll(req->lastZxidSeen);
2203     memcpy(buffer + offset, &req->lastZxidSeen, sizeof(req->lastZxidSeen));
2204     offset = offset +  sizeof(req->lastZxidSeen);
2205 
2206     req->timeOut = htonl(req->timeOut);
2207     memcpy(buffer + offset, &req->timeOut, sizeof(req->timeOut));
2208     offset = offset +  sizeof(req->timeOut);
2209 
2210     req->sessionId = zoo_htonll(req->sessionId);
2211     memcpy(buffer + offset, &req->sessionId, sizeof(req->sessionId));
2212     offset = offset +  sizeof(req->sessionId);
2213 
2214     req->passwd_len = htonl(req->passwd_len);
2215     memcpy(buffer + offset, &req->passwd_len, sizeof(req->passwd_len));
2216     offset = offset +  sizeof(req->passwd_len);
2217 
2218     memcpy(buffer + offset, req->passwd, sizeof(req->passwd));
2219     offset = offset +  sizeof(req->passwd);
2220 
2221     memcpy(buffer + offset, &req->readOnly, sizeof(req->readOnly));
2222 
2223     return 0;
2224 }
2225 
2226 static int deserialize_prime_response(struct prime_struct *resp, char* buffer)
2227 {
2228      //this should be the order of deserialization
2229      int offset = 0;
2230      memcpy(&resp->len, buffer + offset, sizeof(resp->len));
2231      offset = offset +  sizeof(resp->len);
2232 
2233      resp->len = ntohl(resp->len);
2234      memcpy(&resp->protocolVersion,
2235             buffer + offset,
2236             sizeof(resp->protocolVersion));
2237      offset = offset +  sizeof(resp->protocolVersion);
2238 
2239      resp->protocolVersion = ntohl(resp->protocolVersion);
2240      memcpy(&resp->timeOut, buffer + offset, sizeof(resp->timeOut));
2241      offset = offset +  sizeof(resp->timeOut);
2242 
2243      resp->timeOut = ntohl(resp->timeOut);
2244      memcpy(&resp->sessionId, buffer + offset, sizeof(resp->sessionId));
2245      offset = offset +  sizeof(resp->sessionId);
2246 
2247      resp->sessionId = zoo_htonll(resp->sessionId);
2248      memcpy(&resp->passwd_len, buffer + offset, sizeof(resp->passwd_len));
2249      offset = offset +  sizeof(resp->passwd_len);
2250 
2251      resp->passwd_len = ntohl(resp->passwd_len);
2252      memcpy(resp->passwd, buffer + offset, sizeof(resp->passwd));
2253      offset = offset +  sizeof(resp->passwd);
2254 
2255      memcpy(&resp->readOnly, buffer + offset, sizeof(resp->readOnly));
2256 
2257      return 0;
2258 }
2259 
2260 static int prime_connection(zhandle_t *zh)
2261 {
2262     int rc;
2263     /*this is the size of buffer to serialize req into*/
2264     char buffer_req[HANDSHAKE_REQ_SIZE];
2265     int len = sizeof(buffer_req);
2266     int hlen = 0;
2267     struct connect_req req;
2268 
2269     if (zh->state == ZOO_SSL_CONNECTING_STATE) {
2270        // The SSL connection is yet to happen.
2271        return ZOK;
2272     }
2273     req.protocolVersion = 0;
2274     req.sessionId = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
2275     req.passwd_len = sizeof(req.passwd);
2276     memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
2277     req.timeOut = zh->recv_timeout;
2278     req.lastZxidSeen = zh->last_zxid;
2279     req.readOnly = zh->allow_read_only;
2280     hlen = htonl(len);
2281     /* We are running fast and loose here, but this string should fit in the initial buffer! */
2282     rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
2283     serialize_prime_connect(&req, buffer_req);
2284     rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len);
2285     if (rc<0) {
2286         return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
2287                 "failed to send a handshake packet: %s", strerror(errno));
2288     }
2289     zh->state = ZOO_ASSOCIATING_STATE;
2290 
2291     zh->input_buffer = &zh->primer_buffer;
2292     memset(zh->input_buffer->buffer, 0, zh->input_buffer->len);
2293 
2294     /* This seems a bit weird to to set the offset to 4, but we already have a
2295      * length, so we skip reading the length (and allocating the buffer) by
2296      * saying that we are already at offset 4 */
2297     zh->input_buffer->curr_offset = 4;
2298 
2299     return ZOK;
2300 }
2301 
2302 static inline int calculate_interval(const struct timeval *start,
2303         const struct timeval *end)
2304 {
2305     int interval;
2306     struct timeval i = *end;
2307     i.tv_sec -= start->tv_sec;
2308     i.tv_usec -= start->tv_usec;
2309     interval = i.tv_sec * 1000 + (i.tv_usec/1000);
2310     return interval;
2311 }
2312 
2313 static struct timeval get_timeval(int interval)
2314 {
2315     struct timeval tv;
2316     if (interval < 0) {
2317         interval = 0;
2318     }
2319     tv.tv_sec = interval/1000;
2320     tv.tv_usec = (interval%1000)*1000;
2321     return tv;
2322 }
2323 
2324  static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
2325      const void *data);
2326  static int add_string_completion(zhandle_t *zh, int xid,
2327      string_completion_t dc, const void *data);
2328  static int add_string_stat_completion(zhandle_t *zh, int xid,
2329      string_stat_completion_t dc, const void *data);
2330 
2331 
2332  int send_ping(zhandle_t* zh)
2333  {
2334     int rc;
2335     struct oarchive *oa = create_buffer_oarchive();
2336     struct RequestHeader h = {PING_XID, ZOO_PING_OP};
2337 
2338     rc = serialize_RequestHeader(oa, "header", &h);
2339     enter_critical(zh);
2340     get_system_time(&zh->last_ping);
2341     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
2342             get_buffer_len(oa));
2343     leave_critical(zh);
2344     close_buffer_oarchive(&oa, 0);
2345     return rc<0 ? rc : adaptor_send_queue(zh, 0);
2346 }
2347 
2348 /* upper bound of a timeout for seeking for r/w server when in read-only mode */
2349 const int MAX_RW_TIMEOUT = 60000;
2350 const int MIN_RW_TIMEOUT = 200;
2351 
2352 static int ping_rw_server(zhandle_t* zh)
2353 {
2354     char buf[10];
2355     zsock_t fd;
2356     int rc;
2357     sendsize_t ssize;
2358     int sock_flags;
2359 
2360     addrvec_peek(&zh->addrs, &zh->addr_rw_server);
2361 
2362 #ifdef SOCK_CLOEXEC_ENABLED
2363     sock_flags = SOCK_STREAM | SOCK_CLOEXEC;
2364 #else
2365     sock_flags = SOCK_STREAM;
2366 #endif
2367     fd.sock = socket(zh->addr_rw_server.ss_family, sock_flags, 0);
2368     if (fd.sock < 0) {
2369         return 0;
2370     }
2371 
2372     zookeeper_set_sock_nodelay(zh, fd.sock);
2373     zookeeper_set_sock_timeout(zh, fd.sock, 1);
2374 
2375     rc = zookeeper_connect(zh, &zh->addr_rw_server, fd.sock);
2376     if (rc < 0) {
2377         return 0;
2378     }
2379 
2380 #ifdef HAVE_OPENSSL_H
2381     fd.ssl_sock = NULL;
2382     fd.ssl_ctx = NULL;
2383 
2384     if (zh->fd->cert != NULL) {
2385         fd.cert = zh->fd->cert;
2386         rc = init_ssl_for_socket(&fd, zh, 0);
2387         if (rc != ZOK) {
2388             rc = 0;
2389             goto out;
2390         }
2391     }
2392 #endif
2393 
2394     ssize = zookeeper_send(&fd, "isro", 4);
2395     if (ssize < 0) {
2396         rc = 0;
2397         goto out;
2398     }
2399 
2400     memset(buf, 0, sizeof(buf));
2401     rc = zookeeper_recv(&fd, buf, sizeof(buf), 0);
2402     if (rc < 0) {
2403         rc = 0;
2404         goto out;
2405     }
2406 
2407     rc = strcmp("rw", buf) == 0;
2408 
2409 out:
2410     close_zsock(&fd);
2411     addr_rw_server = rc ? &zh->addr_rw_server : 0;
2412     return rc;
2413 }
2414 
2415 #if !defined(WIN32) && !defined(min)
2416 static inline int min(int a, int b)
2417 {
2418     return a < b ? a : b;
2419 }
2420 #endif
2421 
2422 static void zookeeper_set_sock_noblock(zhandle_t *zh, socket_t sock)
2423 {
2424 #ifdef _WIN32
2425     ULONG nonblocking_flag = 1;
2426 
2427     ioctlsocket(sock, FIONBIO, &nonblocking_flag);
2428 #else
2429     fcntl(sock, F_SETFL, O_NONBLOCK|fcntl(sock, F_GETFL, 0));
2430 #endif
2431 }
2432 
2433 static void zookeeper_set_sock_timeout(zhandle_t *zh, socket_t s, int timeout)
2434 {
2435     struct timeval tv;
2436 
2437     tv.tv_sec = timeout;
2438     setsockopt(s, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
2439     setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
2440 }
2441 
2442 static void zookeeper_set_sock_nodelay(zhandle_t *zh, socket_t sock)
2443 {
2444 #ifdef _WIN32
2445     char enable_tcp_nodelay = 1;
2446 #else
2447     int enable_tcp_nodelay = 1;
2448 #endif
2449     int rc;
2450 
2451     rc = setsockopt(sock,
2452                     IPPROTO_TCP,
2453                     TCP_NODELAY,
2454                     &enable_tcp_nodelay,
2455                     sizeof(enable_tcp_nodelay));
2456 
2457     if (rc) {
2458         LOG_WARN(LOGCALLBACK(zh),
2459                  "Unable to set TCP_NODELAY, latency may be effected");
2460     }
2461 }
2462 
2463 static socket_t zookeeper_connect(zhandle_t *zh,
2464                                   struct sockaddr_storage *addr,
2465                                   socket_t fd)
2466 {
2467     int rc;
2468     int addr_len;
2469 
2470 #if defined(AF_INET6)
2471     if (addr->ss_family == AF_INET6) {
2472         addr_len = sizeof(struct sockaddr_in6);
2473     } else {
2474         addr_len = sizeof(struct sockaddr_in);
2475     }
2476 #else
2477     addr_len = sizeof(struct sockaddr_in);
2478 #endif
2479 
2480     LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
2481     rc = connect(fd, (struct sockaddr *)addr, addr_len);
2482 
2483 #ifdef _WIN32
2484     errno = GetLastError();
2485 
2486 #ifndef EWOULDBLOCK
2487 #define EWOULDBLOCK WSAEWOULDBLOCK
2488 #endif
2489 
2490 #ifndef EINPROGRESS
2491 #define EINPROGRESS WSAEINPROGRESS
2492 #endif
2493 
2494 #if _MSC_VER >= 1600
2495     switch(errno) {
2496     case WSAEWOULDBLOCK:
2497         errno = EWOULDBLOCK;
2498         break;
2499     case WSAEINPROGRESS:
2500         errno = EINPROGRESS;
2501         break;
2502     }
2503 #endif
2504 #endif
2505 
2506     return rc;
2507 }
2508 
2509 int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
2510      struct timeval *tv)
2511 {
2512     int sock_flags;
2513     int rc = 0;
2514     struct timeval now;
2515 
2516 #ifdef SOCK_CLOEXEC_ENABLED
2517     sock_flags = SOCK_STREAM | SOCK_CLOEXEC;
2518 #else
2519     sock_flags = SOCK_STREAM;
2520 #endif
2521 
2522     if(zh==0 || fd==0 ||interest==0 || tv==0)
2523         return ZBADARGUMENTS;
2524     if (is_unrecoverable(zh))
2525         return ZINVALIDSTATE;
2526     get_system_time(&now);
2527     if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
2528         int time_left = calculate_interval(&zh->next_deadline, &now);
2529         int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
2530                          (zh->recv_timeout / 10);
2531         if (time_left > max_exceed)
2532             LOG_WARN(LOGCALLBACK(zh), "Exceeded deadline by %dms", time_left);
2533     }
2534     api_prolog(zh);
2535 
2536     rc = update_addrs(zh, &now);
2537     if (rc != ZOK) {
2538         return api_epilog(zh, rc);
2539     }
2540 
2541     *fd = zh->fd->sock;
2542     *interest = 0;
2543     tv->tv_sec = 0;
2544     tv->tv_usec = 0;
2545 
2546     if (*fd == -1) {
2547         /*
2548          * If we previously failed to connect to server pool (zh->delay == 1)
2549          * then we need delay our connection on this iteration 1/60 of the
2550          * recv timeout before trying again so we don't spin.
2551          *
2552          * We always clear the delay setting. If we fail again, we'll set delay
2553          * again and on the next iteration we'll do the same.
2554          *
2555          * We will also delay if the disable_reconnection_attempt is set.
2556          */
2557         if (zh->delay == 1 || zh->disable_reconnection_attempt == 1) {
2558             *tv = get_timeval(zh->recv_timeout/60);
2559             zh->delay = 0;
2560 
2561             LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]",
2562                      zh->hostname);
2563         } else {
2564             if (addr_rw_server) {
2565                 zh->addr_cur = *addr_rw_server;
2566                 addr_rw_server = 0;
2567             } else {
2568                 // No need to delay -- grab the next server and attempt connection
2569                 zoo_cycle_next_server(zh);
2570             }
2571             zh->fd->sock  = socket(zh->addr_cur.ss_family, sock_flags, 0);
2572             if (zh->fd->sock < 0) {
2573               rc = handle_socket_error_msg(zh,
2574                                            __LINE__,
2575                                            ZSYSTEMERROR,
2576                                            "socket() call failed");
2577               return api_epilog(zh, rc);
2578             }
2579 
2580             zookeeper_set_sock_nodelay(zh, zh->fd->sock);
2581             zookeeper_set_sock_noblock(zh, zh->fd->sock);
2582 
2583             rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd->sock);
2584 
2585             if (rc == -1) {
2586                 /* we are handling the non-blocking connect according to
2587                  * the description in section 16.3 "Non-blocking connect"
2588                  * in UNIX Network Programming vol 1, 3rd edition */
2589                 if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
2590                     // For SSL, we first go to ZOO_SSL_CONNECTING_STATE
2591                     if (zh->fd->cert != NULL)
2592                         zh->state = ZOO_SSL_CONNECTING_STATE;
2593                     else
2594                         zh->state = ZOO_CONNECTING_STATE;
2595                 } else {
2596                     rc = handle_socket_error_msg(zh,
2597                                                  __LINE__,
2598                                                  ZCONNECTIONLOSS,
2599                                                  "connect() call failed");
2600                     return api_epilog(zh, rc);
2601                 }
2602             } else {
2603 #ifdef HAVE_OPENSSL_H
2604                 if (zh->fd->cert != NULL) {
2605                     // We do SSL_connect() here
2606                     if (init_ssl_for_handler(zh) != ZOK) {
2607                         return ZSSLCONNECTIONERROR;
2608                     }
2609                 }
2610 #endif
2611                 rc = prime_connection(zh);
2612                 if (rc != 0) {
2613                     return api_epilog(zh,rc);
2614                 }
2615 
2616                 LOG_INFO(LOGCALLBACK(zh),
2617                          "Initiated connection to server %s",
2618                          format_endpoint_info(&zh->addr_cur));
2619             }
2620             *tv = get_timeval(zh->recv_timeout/3);
2621         }
2622         *fd = zh->fd->sock;
2623         zh->last_recv = now;
2624         zh->last_send = now;
2625         zh->last_ping = now;
2626         zh->last_ping_rw = now;
2627         zh->ping_rw_timeout = MIN_RW_TIMEOUT;
2628     }
2629 
2630     if (zh->fd->sock != -1) {
2631         int idle_recv = calculate_interval(&zh->last_recv, &now);
2632         int idle_send = calculate_interval(&zh->last_send, &now);
2633         int recv_to = zh->recv_timeout*2/3 - idle_recv;
2634         int send_to = zh->recv_timeout/3;
2635         // have we exceeded the receive timeout threshold?
2636         if (recv_to <= 0 && zh->state != ZOO_SSL_CONNECTING_STATE) {
2637             // We gotta cut our losses and connect to someone else
2638 #ifdef _WIN32
2639             errno = WSAETIMEDOUT;
2640 #else
2641             errno = ETIMEDOUT;
2642 #endif
2643             *interest=0;
2644             *tv = get_timeval(0);
2645             return api_epilog(zh,handle_socket_error_msg(zh,
2646                     __LINE__,ZOPERATIONTIMEOUT,
2647                     "connection to %s timed out (exceeded timeout by %dms)",
2648                     format_endpoint_info(&zh->addr_cur),
2649                     -recv_to));
2650 
2651         }
2652 
2653         // We only allow 1/3 of our timeout time to expire before sending
2654         // a PING
2655         if (is_connected(zh)) {
2656             send_to = zh->recv_timeout/3 - idle_send;
2657             if (send_to <= 0) {
2658                 if (zh->sent_requests.head == 0) {
2659                     rc = send_ping(zh);
2660                     if (rc < 0) {
2661                         LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
2662                         return api_epilog(zh,rc);
2663                     }
2664                 }
2665                 send_to = zh->recv_timeout/3;
2666             }
2667         }
2668 
2669         // If we are in read-only mode, seek for read/write server
2670         if (zh->state == ZOO_READONLY_STATE) {
2671             int idle_ping_rw = calculate_interval(&zh->last_ping_rw, &now);
2672             if (idle_ping_rw >= zh->ping_rw_timeout) {
2673                 zh->last_ping_rw = now;
2674                 idle_ping_rw = 0;
2675                 zh->ping_rw_timeout = min(zh->ping_rw_timeout * 2,
2676                                           MAX_RW_TIMEOUT);
2677                 if (ping_rw_server(zh)) {
2678                     struct sockaddr_storage addr;
2679                     addrvec_peek(&zh->addrs, &addr);
2680                     zh->ping_rw_timeout = MIN_RW_TIMEOUT;
2681                     LOG_INFO(LOGCALLBACK(zh),
2682                              "r/w server found at %s",
2683                              format_endpoint_info(&addr));
2684                     cleanup(zh, ZOK);
2685                 } else {
2686                     addrvec_next(&zh->addrs, NULL);
2687                 }
2688             }
2689             send_to = min(send_to, zh->ping_rw_timeout - idle_ping_rw);
2690         }
2691 
2692         // choose the lesser value as the timeout
2693         *tv = get_timeval(min(recv_to, send_to));
2694 
2695         zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
2696         zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
2697         if (zh->next_deadline.tv_usec > 1000000) {
2698             zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
2699             zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
2700         }
2701         *interest = ZOOKEEPER_READ;
2702         /* we are interested in a write if we are connected and have something
2703          * to send, or we are waiting for a connect to finish. */
2704         if ((zh->to_send.head && (is_connected(zh) || is_sasl_auth_in_progress(zh)))
2705             || zh->state == ZOO_CONNECTING_STATE
2706             || zh->state == ZOO_SSL_CONNECTING_STATE) {
2707             *interest |= ZOOKEEPER_WRITE;
2708         }
2709     }
2710     return api_epilog(zh,ZOK);
2711 }
2712 
2713 #ifdef HAVE_OPENSSL_H
2714 
2715 /*
2716  * use this function, if you want to init SSL for the socket currently registered in the zookeeper handler
2717  */
2718 static int init_ssl_for_handler(zhandle_t *zh)
2719 {
2720     int rc = init_ssl_for_socket(zh->fd, zh, 1);
2721     if (rc == ZOK) {
2722         // (SUCCESS) Now mark the ZOO_CONNECTING_STATE so that
2723         // prime_connection() happen.
2724         // prime_connection() only happens in ZOO_CONNECTING_STATE
2725         zh->state = ZOO_CONNECTING_STATE;
2726     }
2727     return rc;
2728 }
2729 
2730 /*
2731  * use this function, if you want to init SSL for a socket, pointing to a different server address than the one
2732  * currently registered in the zookeeper handler (e.g. ping other servers when you are connected to a read-only one)
2733  */
2734 static int init_ssl_for_socket(zsock_t *fd, zhandle_t *zh, int fail_on_error) {
2735 
2736     SSL_CTX **ctx;
2737 
2738     if (!fd->ssl_sock) {
2739         const SSL_METHOD *method;
2740 
2741 #if OPENSSL_VERSION_NUMBER < 0x10100000L
2742         OpenSSL_add_all_algorithms();
2743         ERR_load_BIO_strings();
2744         ERR_load_crypto_strings();
2745         SSL_load_error_strings();
2746         SSL_library_init();
2747         method = SSLv23_client_method();
2748 #else
2749         OPENSSL_init_ssl(OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, NULL);
2750         method = TLS_client_method();
2751 #endif
2752         if (FIPS_mode() == 0) {
2753             LOG_INFO(LOGCALLBACK(zh), "FIPS mode is OFF ");
2754         } else {
2755             LOG_INFO(LOGCALLBACK(zh), "FIPS mode is ON ");
2756         }
2757         fd->ssl_ctx = SSL_CTX_new(method);
2758         ctx = &fd->ssl_ctx;
2759 
2760         SSL_CTX_set_verify(*ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, 0);
2761         /*SERVER CA FILE*/
2762         if (SSL_CTX_load_verify_locations(*ctx, fd->cert->ca, 0) != 1) {
2763             SSL_CTX_free(*ctx);
2764             LOG_ERROR(LOGCALLBACK(zh), "Failed to load CA file %s", fd->cert->ca);
2765             errno = EINVAL;
2766             return ZBADARGUMENTS;
2767         }
2768         if (SSL_CTX_set_default_verify_paths(*ctx) != 1) {
2769             SSL_CTX_free(*ctx);
2770             LOG_ERROR(LOGCALLBACK(zh), "Call to SSL_CTX_set_default_verify_paths failed");
2771             errno = EINVAL;
2772             return ZBADARGUMENTS;
2773         }
2774         /*CLIENT CA FILE (With Certificate Chain)*/
2775         if (SSL_CTX_use_certificate_chain_file(*ctx, fd->cert->cert) != 1) {
2776             SSL_CTX_free(*ctx);
2777             LOG_ERROR(LOGCALLBACK(zh), "Failed to load client certificate chain from %s", fd->cert->cert);
2778             errno = EINVAL;
2779             return ZBADARGUMENTS;
2780         }
2781         /*CLIENT PRIVATE KEY*/
2782         SSL_CTX_set_default_passwd_cb_userdata(*ctx, fd->cert->passwd);
2783         if (SSL_CTX_use_PrivateKey_file(*ctx, fd->cert->key, SSL_FILETYPE_PEM) != 1) {
2784             SSL_CTX_free(*ctx);
2785             LOG_ERROR(LOGCALLBACK(zh), "Failed to load client private key from %s", fd->cert->key);
2786             errno = EINVAL;
2787             return ZBADARGUMENTS;
2788         }
2789         /*CHECK*/
2790         if (SSL_CTX_check_private_key(*ctx) != 1) {
2791             SSL_CTX_free(*ctx);
2792             LOG_ERROR(LOGCALLBACK(zh), "SSL_CTX_check_private_key failed");
2793             errno = EINVAL;
2794             return ZBADARGUMENTS;
2795         }
2796         /*MULTIPLE HANDSHAKE*/
2797         SSL_CTX_set_mode(*ctx, SSL_MODE_AUTO_RETRY);
2798 
2799         fd->ssl_sock = SSL_new(*ctx);
2800         if (fd->ssl_sock == NULL) {
2801             if (fail_on_error) {
2802                 return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error creating ssl context");
2803             } else {
2804                 LOG_ERROR(LOGCALLBACK(zh), "error creating ssl context");
2805                 return ZSSLCONNECTIONERROR;
2806             }
2807 
2808         }
2809         SSL_set_fd(fd->ssl_sock, fd->sock);
2810     }
2811     while(1) {
2812         int rc;
2813         int sock = fd->sock;
2814         struct timeval tv;
2815         fd_set s_rfds, s_wfds;
2816         tv.tv_sec = 1;
2817         tv.tv_usec = 0;
2818         FD_ZERO(&s_rfds);
2819         FD_ZERO(&s_wfds);
2820         rc = SSL_connect(fd->ssl_sock);
2821         if (rc == 1) {
2822             return ZOK;
2823         } else {
2824             rc = SSL_get_error(fd->ssl_sock, rc);
2825             if (rc == SSL_ERROR_WANT_READ) {
2826                 FD_SET(sock, &s_rfds);
2827                 FD_CLR(sock, &s_wfds);
2828             } else if (rc == SSL_ERROR_WANT_WRITE) {
2829                 FD_SET(sock, &s_wfds);
2830                 FD_CLR(sock, &s_rfds);
2831             } else {
2832                 if (fail_on_error) {
2833                     return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error in ssl connect");
2834                 } else {
2835                     LOG_ERROR(LOGCALLBACK(zh), "error in ssl connect");
2836                     return ZSSLCONNECTIONERROR;
2837                 }
2838             }
2839             rc = select(sock + 1, &s_rfds, &s_wfds, NULL, &tv);
2840             if (rc == -1) {
2841                 if (fail_on_error) {
2842                     return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error in ssl connect (after select)");
2843                 } else {
2844                     LOG_ERROR(LOGCALLBACK(zh), "error in ssl connect (after select)");
2845                     return ZSSLCONNECTIONERROR;
2846                 }
2847             }
2848         }
2849     }
2850 }
2851 
2852 
2853 #endif
2854 
2855 /*
2856  * the "bottom half" of the session establishment procedure, executed
2857  * either after receiving the "prime response," or after SASL
2858  * authentication is complete
2859  */
2860 static void finalize_session_establishment(zhandle_t *zh) {
2861     zh->state = zh->primer_storage.readOnly ?
2862         ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
2863     zh->reconfig = 0;
2864     LOG_INFO(LOGCALLBACK(zh),
2865              "session establishment complete on server %s, sessionId=%#llx, negotiated timeout=%d %s",
2866              format_endpoint_info(&zh->addr_cur),
2867              zh->client_id.client_id, zh->recv_timeout,
2868              zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
2869     /* we want the auth to be sent for, but since both call push to front
2870        we need to call send_watch_set first */
2871     send_set_watches(zh);
2872     /* send the authentication packet now */
2873     send_auth_info(zh);
2874     LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
2875     zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
2876     PROCESS_SESSION_EVENT(zh, zh->state);
2877 
2878     if (has_sasl_client(zh)) {
2879         /* some packets might have been delayed during SASL negotiaton. */
2880         adaptor_send_queue(zh, 0);
2881     }
2882 }
2883 
2884 #ifdef HAVE_CYRUS_SASL_H
2885 
2886 /*
2887  * queue an encoded SASL request to ZooKeeper.  The packet is added to
2888  * the front of the queue.
2889  *
2890  * \param zh the ZooKeeper handle
2891  * \param client_data the encoded SASL data, ready to send
2892  * \param client_data_len the length of \c client_data
2893  * \return ZOK on success, or ZMARSHALLINGERROR if something went wrong
2894  */
2895 int queue_sasl_request(zhandle_t *zh, const char *client_data, int client_data_len)
2896 {
2897     struct oarchive *oa;
2898     int rc;
2899 
2900     /* Java client use normal xid, too. */
2901     struct RequestHeader h = { get_xid(), ZOO_SASL_OP };
2902     struct GetSASLRequest req = { { client_data_len, client_data_len>0 ? (char *) client_data : "" } };
2903 
2904     oa = create_buffer_oarchive();
2905     rc = serialize_RequestHeader(oa, "header", &h);
2906     rc = rc < 0 ? rc : serialize_GetSASLRequest(oa, "req", &req);
2907     rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
2908          get_buffer_len(oa));
2909     close_buffer_oarchive(&oa, 0);
2910 
2911     LOG_DEBUG(LOGCALLBACK(zh),
2912         "SASL: Queued request len=%d rc=%d", client_data_len, rc);
2913 
2914     return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
2915 }
2916 
2917 /*
2918  * decode an expected SASL response and perform the corresponding
2919  * authentication step
2920  */
2921 static int process_sasl_response(zhandle_t *zh, char *buffer, int len)
2922 {
2923     struct iarchive *ia = create_buffer_iarchive(buffer, len);
2924     struct ReplyHeader hdr;
2925     struct SetSASLResponse res;
2926     int rc;
2927 
2928     rc = ia ? ZOK : ZSYSTEMERROR;
2929     rc = rc < 0 ? rc : deserialize_ReplyHeader(ia, "hdr", &hdr);
2930     rc = rc < 0 ? rc : deserialize_SetSASLResponse(ia, "reply", &res);
2931     rc = rc < 0 ? rc : zoo_sasl_client_step(zh, res.token.buff, res.token.len);
2932     deallocate_SetSASLResponse(&res);
2933     if (ia) {
2934         close_buffer_iarchive(&ia);
2935     }
2936 
2937     LOG_DEBUG(LOGCALLBACK(zh),
2938         "SASL: Processed response len=%d rc=%d", len, rc);
2939 
2940     return rc;
2941 }
2942 
2943 #endif /* HAVE_CYRUS_SASL_H */
2944 
2945 static int check_events(zhandle_t *zh, int events)
2946 {
2947     if (zh->fd->sock == -1)
2948         return ZINVALIDSTATE;
2949 
2950 #ifdef HAVE_OPENSSL_H
2951     if ((events&ZOOKEEPER_WRITE) && (zh->state == ZOO_SSL_CONNECTING_STATE) && zh->fd->cert != NULL) {
2952         int rc, error;
2953         socklen_t len = sizeof(error);
2954         rc = getsockopt(zh->fd->sock, SOL_SOCKET, SO_ERROR, &error, &len);
2955         /* the description in section 16.4 "Non-blocking connect"
2956          * in UNIX Network Programming vol 1, 3rd edition, points out
2957          * that sometimes the error is in errno and sometimes in error */
2958         if (rc < 0 || error) {
2959             if (rc == 0)
2960                 errno = error;
2961             return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
2962                 "server refused to accept the client");
2963         }
2964         // We do SSL_connect() here
2965         if (init_ssl_for_handler(zh) != ZOK) {
2966             return ZSSLCONNECTIONERROR;
2967         }
2968     }
2969 #endif
2970 
2971     if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
2972         int rc, error;
2973         socklen_t len = sizeof(error);
2974         rc = getsockopt(zh->fd->sock, SOL_SOCKET, SO_ERROR, &error, &len);
2975         /* the description in section 16.4 "Non-blocking connect"
2976          * in UNIX Network Programming vol 1, 3rd edition, points out
2977          * that sometimes the error is in errno and sometimes in error */
2978         if (rc < 0 || error) {
2979             if (rc == 0)
2980                 errno = error;
2981             return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
2982                 "server refused to accept the client");
2983         }
2984 
2985         if((rc=prime_connection(zh))!=0)
2986             return rc;
2987 
2988         LOG_INFO(LOGCALLBACK(zh), "initiated connection to server %s", format_endpoint_info(&zh->addr_cur));
2989         return ZOK;
2990     }
2991 
2992     if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
2993         /* make the flush call non-blocking by specifying a 0 timeout */
2994         int rc=flush_send_queue(zh,0);
2995         if (rc < 0)
2996             return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
2997                 "failed while flushing send queue");
2998     }
2999     if (events&ZOOKEEPER_READ) {
3000         int rc;
3001         if (zh->input_buffer == 0) {
3002             zh->input_buffer = allocate_buffer(0,0);
3003         }
3004 
3005         rc = recv_buffer(zh, zh->input_buffer);
3006         if (rc < 0) {
3007             return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
3008                 "failed while receiving a server response");
3009         }
3010         if (rc > 0) {
3011             get_system_time(&zh->last_recv);
3012             if (zh->input_buffer != &zh->primer_buffer) {
3013                 if (is_connected(zh) || !is_sasl_auth_in_progress(zh)) {
3014                     queue_buffer(&zh->to_process, zh->input_buffer, 0);
3015 #ifdef HAVE_CYRUS_SASL_H
3016                 } else {
3017                     rc = process_sasl_response(zh, zh->input_buffer->buffer, zh->input_buffer->curr_offset);
3018                     free_buffer(zh->input_buffer);
3019                     if (rc < 0) {
3020                         zoo_sasl_mark_failed(zh);
3021                         return rc;
3022                     } else if (zh->sasl_client->state == ZOO_SASL_COMPLETE) {
3023                         /*
3024                          * SASL authentication just completed; send
3025                          * watches, auth. info, etc. now.
3026                          */
3027                         finalize_session_establishment(zh);
3028                     }
3029 #endif /* HAVE_CYRUS_SASL_H */
3030                 }
3031             } else  {
3032                 int64_t oldid, newid;
3033                 //deserialize
3034                 deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
3035                 /* We are processing the primer_buffer, so we need to finish
3036                  * the connection handshake */
3037                 oldid = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
3038                 zh->seen_rw_server_before |= !zh->primer_storage.readOnly;
3039                 newid = zh->primer_storage.sessionId;
3040                 if (oldid != 0 && oldid != newid) {
3041                     zh->state = ZOO_EXPIRED_SESSION_STATE;
3042                     errno = ESTALE;
3043                     return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
3044                             "sessionId=%#llx has expired.",oldid);
3045                 } else {
3046                     zh->recv_timeout = zh->primer_storage.timeOut;
3047                     zh->client_id.client_id = newid;
3048 
3049                     memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
3050                            sizeof(zh->client_id.passwd));
3051 
3052 #ifdef HAVE_CYRUS_SASL_H
3053                     if (zh->sasl_client) {
3054                         /*
3055                          * Start a SASL authentication session.
3056                          * Watches, auth. info, etc. will be sent
3057                          * after it completes.
3058                          */
3059                         rc = zoo_sasl_connect(zh);
3060                         rc = rc < 0 ? rc : zoo_sasl_client_start(zh);
3061                         if (rc < 0) {
3062                             zoo_sasl_mark_failed(zh);
3063                             return rc;
3064                         }
3065                     } else {
3066                         /* Can send watches, auth. info, etc. immediately. */
3067                         finalize_session_establishment(zh);
3068                     }
3069 #else /* HAVE_CYRUS_SASL_H */
3070                     /* Can send watches, auth. info, etc. immediately. */
3071                     finalize_session_establishment(zh);
3072 #endif /* HAVE_CYRUS_SASL_H */
3073                 }
3074             }
3075             zh->input_buffer = 0;
3076         } else {
3077             // zookeeper_process was called but there was nothing to read
3078             // from the socket
3079             return ZNOTHING;
3080         }
3081     }
3082     return ZOK;
3083 }
3084 
3085 void api_prolog(zhandle_t* zh)
3086 {
3087     inc_ref_counter(zh,1);
3088 }
3089 
3090 int api_epilog(zhandle_t *zh,int rc)
3091 {
3092     if(inc_ref_counter(zh,-1)==0 && zh->close_requested!=0)
3093         zookeeper_close(zh);
3094     return rc;
3095 }
3096 
3097 //#ifdef THREADED
3098 // IO thread queues session events to be processed by the completion thread
3099 static int queue_session_event(zhandle_t *zh, int state)
3100 {
3101     int rc;
3102     struct WatcherEvent evt = { ZOO_SESSION_EVENT, state, "" };
3103     struct ReplyHeader hdr = { WATCHER_EVENT_XID, 0, 0 };
3104     struct oarchive *oa;
3105     completion_list_t *cptr;
3106 
3107     if ((oa=create_buffer_oarchive())==NULL) {
3108         LOG_ERROR(LOGCALLBACK(zh), "out of memory");
3109         goto error;
3110     }
3111     rc = serialize_ReplyHeader(oa, "hdr", &hdr);
3112     rc = rc<0?rc: serialize_WatcherEvent(oa, "event", &evt);
3113     if(rc<0){
3114         close_buffer_oarchive(&oa, 1);
3115         goto error;
3116     }
3117     cptr = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
3118     cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
3119     cptr->buffer->curr_offset = get_buffer_len(oa);
3120     if (!cptr->buffer) {
3121         free(cptr);
3122         close_buffer_oarchive(&oa, 1);
3123         goto error;
3124     }
3125     /* We queued the buffer, so don't free it */
3126     close_buffer_oarchive(&oa, 0);
3127     lock_watchers(zh);
3128     cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
3129     unlock_watchers(zh);
3130     queue_completion(&zh->completions_to_process, cptr, 0);
3131     if (process_async(zh->outstanding_sync)) {
3132         process_completions(zh);
3133     }
3134     return ZOK;
3135 error:
3136     errno=ENOMEM;
3137     return ZSYSTEMERROR;
3138 }
3139 //#endif
3140 
3141 completion_list_t *dequeue_completion(completion_head_t *list)
3142 {
3143     completion_list_t *cptr;
3144     lock_completion_list(list);
3145     cptr = list->head;
3146     if (cptr) {
3147         list->head = cptr->next;
3148         if (!list->head) {
3149             assert(list->last == cptr);
3150             list->last = 0;
3151         }
3152     }
3153     unlock_completion_list(list);
3154     return cptr;
3155 }
3156 
3157 // cleanup completion list of a failed multi request
3158 static void cleanup_failed_multi(zhandle_t *zh, int xid, int rc, completion_list_t *cptr) {
3159     completion_list_t *entry;
3160     completion_head_t *clist = &cptr->c.clist;
3161     while ((entry = dequeue_completion(clist)) != NULL) {
3162         // Fake failed response for all sub-requests
3163         deserialize_response(zh, entry->c.type, xid, 1, rc, entry, NULL);
3164         destroy_completion_entry(entry);
3165     }
3166 }
3167 
3168 static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, struct iarchive *ia)
3169 {
3170     int rc = 0;
3171     completion_head_t *clist = &cptr->c.clist;
3172     struct MultiHeader mhdr = {0, 0, 0};
3173     assert(clist);
3174     deserialize_MultiHeader(ia, "multiheader", &mhdr);
3175     while (!mhdr.done) {
3176         completion_list_t *entry = dequeue_completion(clist);
3177         assert(entry);
3178 
3179         if (mhdr.type == -1) {
3180             struct ErrorResponse er;
3181             deserialize_ErrorResponse(ia, "error", &er);
3182             mhdr.err = er.err ;
3183             if (rc == 0 && er.err != 0 && er.err != ZRUNTIMEINCONSISTENCY) {
3184                 rc = er.err;
3185             }
3186         }
3187 
3188         deserialize_response(zh, entry->c.type, xid, mhdr.type == -1, mhdr.err, entry, ia);
3189         deserialize_MultiHeader(ia, "multiheader", &mhdr);
3190         //While deserializing the response we must destroy completion entry for each operation in
3191         //the zoo_multi transaction. Otherwise this results in memory leak when client invokes zoo_multi
3192         //operation.
3193         destroy_completion_entry(entry);
3194     }
3195 
3196     return rc;
3197 }
3198 
3199 static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
3200 {
3201     switch (type) {
3202     case COMPLETION_DATA:
3203         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
3204                     cptr->xid, failed, rc);
3205         if (failed) {
3206             cptr->c.data_result(rc, 0, 0, 0, cptr->data);
3207         } else {
3208             struct GetDataResponse res;
3209             deserialize_GetDataResponse(ia, "reply", &res);
3210             cptr->c.data_result(rc, res.data.buff, res.data.len,
3211                     &res.stat, cptr->data);
3212             deallocate_GetDataResponse(&res);
3213         }
3214         break;
3215     case COMPLETION_STAT:
3216         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
3217                     cptr->xid, failed, rc);
3218         if (failed) {
3219             cptr->c.stat_result(rc, 0, cptr->data);
3220         } else {
3221             struct SetDataResponse res;
3222             deserialize_SetDataResponse(ia, "reply", &res);
3223             cptr->c.stat_result(rc, &res.stat, cptr->data);
3224             deallocate_SetDataResponse(&res);
3225         }
3226         break;
3227     case COMPLETION_STRINGLIST:
3228         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
3229                     cptr->xid, failed, rc);
3230         if (failed) {
3231             cptr->c.strings_result(rc, 0, cptr->data);
3232         } else {
3233             struct GetChildrenResponse res;
3234             deserialize_GetChildrenResponse(ia, "reply", &res);
3235             cptr->c.strings_result(rc, &res.children, cptr->data);
3236             deallocate_GetChildrenResponse(&res);
3237         }
3238         break;
3239     case COMPLETION_STRINGLIST_STAT:
3240         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
3241                     cptr->xid, failed, rc);
3242         if (failed) {
3243             cptr->c.strings_stat_result(rc, 0, 0, cptr->data);
3244         } else {
3245             struct GetChildren2Response res;
3246             deserialize_GetChildren2Response(ia, "reply", &res);
3247             cptr->c.strings_stat_result(rc, &res.children, &res.stat, cptr->data);
3248             deallocate_GetChildren2Response(&res);
3249         }
3250         break;
3251     case COMPLETION_STRING:
3252         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
3253                     cptr->xid, failed, rc);
3254         if (failed) {
3255             cptr->c.string_result(rc, 0, cptr->data);
3256         } else {
3257             struct CreateResponse res;
3258             const char *client_path;
3259             memset(&res, 0, sizeof(res));
3260             deserialize_CreateResponse(ia, "reply", &res);
3261             client_path = sub_string(zh, res.path);
3262             cptr->c.string_result(rc, client_path, cptr->data);
3263             free_duplicate_path(client_path, res.path);
3264             deallocate_CreateResponse(&res);
3265         }
3266         break;
3267     case COMPLETION_STRING_STAT:
3268         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_STRING_STAT for xid=%#x failed=%d, rc=%d",
3269                     cptr->xid, failed, rc);
3270         if (failed) {
3271             cptr->c.string_stat_result(rc, 0, 0, cptr->data);
3272         } else {
3273             struct Create2Response res;
3274             const char *client_path;
3275             deserialize_Create2Response(ia, "reply", &res);
3276             client_path = sub_string(zh, res.path);
3277             cptr->c.string_stat_result(rc, client_path, &res.stat, cptr->data);
3278             free_duplicate_path(client_path, res.path);
3279             deallocate_Create2Response(&res);
3280         }
3281         break;
3282     case COMPLETION_ACLLIST:
3283         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
3284                     cptr->xid, failed, rc);
3285         if (failed) {
3286             cptr->c.acl_result(rc, 0, 0, cptr->data);
3287         } else {
3288             struct GetACLResponse res;
3289             deserialize_GetACLResponse(ia, "reply", &res);
3290             cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
3291             deallocate_GetACLResponse(&res);
3292         }
3293         break;
3294     case COMPLETION_VOID:
3295         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
3296                     cptr->xid, failed, rc);
3297         assert(cptr->c.void_result);
3298         cptr->c.void_result(rc, cptr->data);
3299         break;
3300     case COMPLETION_MULTI:
3301         LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
3302                     cptr->xid, failed, rc);
3303         assert(cptr->c.void_result);
3304         if (failed) {
3305             cleanup_failed_multi(zh, xid, rc, cptr);
3306         } else {
3307             rc = deserialize_multi(zh, xid, cptr, ia);
3308         }
3309         cptr->c.void_result(rc, cptr->data);
3310         break;
3311     default:
3312         LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
3313     }
3314 }
3315 
3316 
3317 /* handles async completion (both single- and multithreaded) */
3318 void process_completions(zhandle_t *zh)
3319 {
3320     completion_list_t *cptr;
3321     while ((cptr = dequeue_completion(&zh->completions_to_process)) != 0) {
3322         struct ReplyHeader hdr;
3323         buffer_list_t *bptr = cptr->buffer;
3324         struct iarchive *ia = create_buffer_iarchive(bptr->buffer,
3325                 bptr->len);
3326         deserialize_ReplyHeader(ia, "hdr", &hdr);
3327 
3328         if (hdr.xid == WATCHER_EVENT_XID) {
3329             int type, state;
3330             struct WatcherEvent evt;
3331             deserialize_WatcherEvent(ia, "event", &evt);
3332             /* We are doing a notification, so there is no pending request */
3333             type = evt.type;
3334             state = evt.state;
3335             /* This is a notification so there aren't any pending requests */
3336             LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for node [%s], type = %d event=%s",
3337                        (evt.path==NULL?"NULL":evt.path), cptr->c.type,
3338                        watcherEvent2String(type));
3339             deliverWatchers(zh,type,state,evt.path, &cptr->c.watcher_result);
3340             deallocate_WatcherEvent(&evt);
3341         } else {
3342             deserialize_response(zh, cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
3343         }
3344         destroy_completion_entry(cptr);
3345         close_buffer_iarchive(&ia);
3346     }
3347 }
3348 
3349 static void isSocketReadable(zhandle_t* zh)
3350 {
3351 #ifndef _WIN32
3352     struct pollfd fds;
3353     fds.fd = zh->fd->sock;
3354     fds.events = POLLIN;
3355     if (poll(&fds,1,0)<=0) {
3356         // socket not readable -- no more responses to process
3357         zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
3358     }
3359 #else
3360     fd_set rfds;
3361     struct timeval waittime = {0, 0};
3362     FD_ZERO(&rfds);
3363     FD_SET( zh->fd , &rfds);
3364     if (select(0, &rfds, NULL, NULL, &waittime) <= 0){
3365         // socket not readable -- no more responses to process
3366         zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
3367     }
3368 #endif
3369     else{
3370         get_system_time(&zh->socket_readable);
3371     }
3372 }
3373 
3374 static void checkResponseLatency(zhandle_t* zh)
3375 {
3376     int delay;
3377     struct timeval now;
3378 
3379     if(zh->socket_readable.tv_sec==0)
3380         return;
3381 
3382     get_system_time(&now);
3383     delay=calculate_interval(&zh->socket_readable, &now);
3384     if(delay>20)
3385         LOG_DEBUG(LOGCALLBACK(zh), "The following server response has spent at least %dms sitting in the client socket recv buffer",delay);
3386 
3387     zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
3388 }
3389 
3390 int zookeeper_process(zhandle_t *zh, int events)
3391 {
3392     buffer_list_t *bptr;
3393     int rc;
3394 
3395     if (zh==NULL)
3396         return ZBADARGUMENTS;
3397     if (is_unrecoverable(zh))
3398         return ZINVALIDSTATE;
3399     api_prolog(zh);
3400     IF_DEBUG(checkResponseLatency(zh));
3401     rc = check_events(zh, events);
3402     if (rc!=ZOK)
3403         return api_epilog(zh, rc);
3404 
3405     IF_DEBUG(isSocketReadable(zh));
3406 
3407     while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
3408         struct ReplyHeader hdr;
3409         struct iarchive *ia = create_buffer_iarchive(
3410                                     bptr->buffer, bptr->curr_offset);
3411         deserialize_ReplyHeader(ia, "hdr", &hdr);
3412 
3413         if (hdr.xid == PING_XID) {
3414             // Ping replies can arrive out-of-order
3415             int elapsed = 0;
3416             struct timeval now;
3417             gettimeofday(&now, 0);
3418             elapsed = calculate_interval(&zh->last_ping, &now);
3419             LOG_DEBUG(LOGCALLBACK(zh), "Got ping response in %d ms", elapsed);
3420             free_buffer(bptr);
3421         } else if (hdr.xid == WATCHER_EVENT_XID) {
3422             struct WatcherEvent evt;
3423             int type = 0;
3424             char *path = NULL;
3425             completion_list_t *c = NULL;
3426 
3427             LOG_DEBUG(LOGCALLBACK(zh), "Processing WATCHER_EVENT");
3428 
3429             deserialize_WatcherEvent(ia, "event", &evt);
3430             type = evt.type;
3431             path = evt.path;
3432             /* We are doing a notification, so there is no pending request */
3433             c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
3434             c->buffer = bptr;
3435             lock_watchers(zh);
3436             c->c.watcher_result = collectWatchers(zh, type, path);
3437             unlock_watchers(zh);
3438 
3439             // We cannot free until now, otherwise path will become invalid
3440             deallocate_WatcherEvent(&evt);
3441             queue_completion(&zh->completions_to_process, c, 0);
3442         } else if (hdr.xid == SET_WATCHES_XID) {
3443             LOG_DEBUG(LOGCALLBACK(zh), "Processing SET_WATCHES");
3444             free_buffer(bptr);
3445         } else if (hdr.xid == AUTH_XID){
3446             LOG_DEBUG(LOGCALLBACK(zh), "Processing AUTH_XID");
3447 
3448             /* special handling for the AUTH response as it may come back
3449              * out-of-band */
3450             auth_completion_func(hdr.err,zh);
3451             free_buffer(bptr);
3452             /* authentication completion may change the connection state to
3453              * unrecoverable */
3454             if(is_unrecoverable(zh)){
3455                 handle_error(zh, ZAUTHFAILED);
3456                 close_buffer_iarchive(&ia);
3457                 return api_epilog(zh, ZAUTHFAILED);
3458             }
3459         } else {
3460             int rc = hdr.err;
3461             /* Find the request corresponding to the response */
3462             completion_list_t *cptr = dequeue_completion(&zh->sent_requests);
3463 
3464             /* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
3465             if (zh->close_requested == 1 && cptr == NULL) {
3466                 LOG_DEBUG(LOGCALLBACK(zh), "Completion queue has been cleared by zookeeper_close()");
3467                 close_buffer_iarchive(&ia);
3468                 free_buffer(bptr);
3469                 return api_epilog(zh,ZINVALIDSTATE);
3470             }
3471             assert(cptr);
3472             /* The requests are going to come back in order */
3473             if (cptr->xid != hdr.xid) {
3474                 LOG_DEBUG(LOGCALLBACK(zh), "Processing unexpected or out-of-order response!");
3475 
3476                 // received unexpected (or out-of-order) response
3477                 close_buffer_iarchive(&ia);
3478                 free_buffer(bptr);
3479                 // put the completion back on the queue (so it gets properly
3480                 // signaled and deallocated) and disconnect from the server
3481                 queue_completion(&zh->sent_requests,cptr,1);
3482                 return api_epilog(zh,
3483                                   handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
3484                                   "unexpected server response: expected %#x, but received %#x",
3485                                   hdr.xid,cptr->xid));
3486             }
3487 
3488             if (hdr.zxid > 0) {
3489                 // Update last_zxid only when it is a request response
3490                 zh->last_zxid = hdr.zxid;
3491             }
3492             lock_watchers(zh);
3493             activateWatcher(zh, cptr->watcher, rc);
3494             deactivateWatcher(zh, cptr->watcher_deregistration, rc);
3495             unlock_watchers(zh);
3496 
3497             if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
3498                 LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
3499                 cptr->buffer = bptr;
3500                 queue_completion(&zh->completions_to_process, cptr, 0);
3501             } else {
3502 #ifdef THREADED
3503                 struct sync_completion
3504                         *sc = (struct sync_completion*)cptr->data;
3505                 sc->rc = rc;
3506 
3507                 process_sync_completion(zh, cptr, sc, ia);
3508 
3509                 notify_sync_completion(sc);
3510                 free_buffer(bptr);
3511                 zh->outstanding_sync--;
3512                 destroy_completion_entry(cptr);
3513 #else
3514                 abort_singlethreaded(zh);
3515 #endif
3516             }
3517         }
3518 
3519         close_buffer_iarchive(&ia);
3520 
3521     }
3522     if (process_async(zh->outstanding_sync)) {
3523         process_completions(zh);
3524     }
3525 
3526     return api_epilog(zh, ZOK);
3527 }
3528 
3529 int zoo_state(zhandle_t *zh)
3530 {
3531     if(zh!=0)
3532         return zh->state;
3533     return 0;
3534 }
3535 
3536 static watcher_registration_t* create_watcher_registration(const char* path,
3537         result_checker_fn checker,watcher_fn watcher,void* ctx){
3538     watcher_registration_t* wo;
3539     if(watcher==0)
3540         return 0;
3541     wo=calloc(1,sizeof(watcher_registration_t));
3542     wo->path=strdup(path);
3543     wo->watcher=watcher;
3544     wo->context=ctx;
3545     wo->checker=checker;
3546     return wo;
3547 }
3548 
3549 static watcher_deregistration_t* create_watcher_deregistration(const char* path,
3550         watcher_fn watcher, void *watcherCtx, ZooWatcherType wtype) {
3551     watcher_deregistration_t *wdo;
3552 
3553     wdo = calloc(1, sizeof(watcher_deregistration_t));
3554     if (!wdo) {
3555       return NULL;
3556     }
3557     wdo->path = strdup(path);
3558     wdo->watcher = watcher;
3559     wdo->context = watcherCtx;
3560     wdo->type = wtype;
3561     return wdo;
3562 }
3563 
3564 static void destroy_watcher_registration(watcher_registration_t* wo){
3565     if(wo!=0){
3566         free((void*)wo->path);
3567         free(wo);
3568     }
3569 }
3570 
3571 static void destroy_watcher_deregistration(watcher_deregistration_t *wdo) {
3572     if (wdo) {
3573         free((void *)wdo->path);
3574         free(wdo);
3575     }
3576 }
3577 
3578 static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
3579         const void *dc, const void *data,watcher_registration_t* wo, completion_head_t *clist)
3580 {
3581     return do_create_completion_entry(zh, xid, completion_type, dc, data, wo,
3582                                       clist, NULL);
3583 }
3584 
3585 static completion_list_t* create_completion_entry_deregistration(zhandle_t *zh,
3586         int xid, int completion_type, const void *dc, const void *data,
3587         watcher_deregistration_t* wdo, completion_head_t *clist)
3588 {
3589     return do_create_completion_entry(zh, xid, completion_type, dc, data, NULL,
3590                                       clist, wdo);
3591 }
3592 
3593 static completion_list_t* do_create_completion_entry(zhandle_t *zh, int xid,
3594         int completion_type, const void *dc, const void *data,
3595         watcher_registration_t* wo, completion_head_t *clist,
3596         watcher_deregistration_t* wdo)
3597 {
3598     completion_list_t *c = calloc(1, sizeof(completion_list_t));
3599     if (!c) {
3600         LOG_ERROR(LOGCALLBACK(zh), "out of memory");
3601         return 0;
3602     }
3603     c->c.type = completion_type;
3604     c->data = data;
3605     switch(c->c.type) {
3606     case COMPLETION_VOID:
3607         c->c.void_result = (void_completion_t)dc;
3608         break;
3609     case COMPLETION_STRING:
3610         c->c.string_result = (string_completion_t)dc;
3611         break;
3612     case COMPLETION_DATA:
3613         c->c.data_result = (data_completion_t)dc;
3614         break;
3615     case COMPLETION_STAT:
3616         c->c.stat_result = (stat_completion_t)dc;
3617         break;
3618     case COMPLETION_STRINGLIST:
3619         c->c.strings_result = (strings_completion_t)dc;
3620         break;
3621     case COMPLETION_STRINGLIST_STAT:
3622         c->c.strings_stat_result = (strings_stat_completion_t)dc;
3623         break;
3624     case COMPLETION_STRING_STAT:
3625         c->c.string_stat_result = (string_stat_completion_t)dc;
3626     case COMPLETION_ACLLIST:
3627         c->c.acl_result = (acl_completion_t)dc;
3628         break;
3629     case COMPLETION_MULTI:
3630         assert(clist);
3631         c->c.void_result = (void_completion_t)dc;
3632         c->c.clist = *clist;
3633         break;
3634     }
3635     c->xid = xid;
3636     c->watcher = wo;
3637     c->watcher_deregistration = wdo;
3638 
3639     return c;
3640 }
3641 
3642 static void destroy_completion_entry(completion_list_t* c){
3643     if(c!=0){
3644         destroy_watcher_registration(c->watcher);
3645         destroy_watcher_deregistration(c->watcher_deregistration);
3646         if(c->buffer!=0)
3647             free_buffer(c->buffer);
3648         free(c);
3649     }
3650 }
3651 
3652 static void queue_completion_nolock(completion_head_t *list,
3653                                     completion_list_t *c,
3654                                     int add_to_front)
3655 {
3656     c->next = 0;
3657     /* appending a new entry to the back of the list */
3658     if (list->last) {
3659         assert(list->head);
3660         // List is not empty
3661         if (!add_to_front) {
3662             list->last->next = c;
3663             list->last = c;
3664         } else {
3665             c->next = list->head;
3666             list->head = c;
3667         }
3668     } else {
3669         // List is empty
3670         assert(!list->head);
3671         list->head = c;
3672         list->last = c;
3673     }
3674 }
3675 
3676 static void queue_completion(completion_head_t *list, completion_list_t *c,
3677         int add_to_front)
3678 {
3679 
3680     lock_completion_list(list);
3681     queue_completion_nolock(list, c, add_to_front);
3682     unlock_completion_list(list);
3683 }
3684 
3685 static int add_completion(zhandle_t *zh, int xid, int completion_type,
3686         const void *dc, const void *data, int add_to_front,
3687         watcher_registration_t* wo, completion_head_t *clist)
3688 {
3689     completion_list_t *c =create_completion_entry(zh, xid, completion_type, dc,
3690             data, wo, clist);
3691     return do_add_completion(zh, dc, c, add_to_front);
3692 }
3693 
3694 static int add_completion_deregistration(zhandle_t *zh, int xid,
3695         int completion_type, const void *dc, const void *data, int add_to_front,
3696         watcher_deregistration_t* wdo, completion_head_t *clist)
3697 {
3698     completion_list_t *c = create_completion_entry_deregistration(zh, xid,
3699            completion_type, dc, data, wdo, clist);
3700     return do_add_completion(zh, dc, c, add_to_front);
3701 }
3702 
3703 static int do_add_completion(zhandle_t *zh, const void *dc,
3704         completion_list_t *c, int add_to_front)
3705 {
3706     int rc = 0;
3707     if (!c)
3708         return ZSYSTEMERROR;
3709     lock_completion_list(&zh->sent_requests);
3710     if (zh->close_requested != 1) {
3711         queue_completion_nolock(&zh->sent_requests, c, add_to_front);
3712         if (dc == SYNCHRONOUS_MARKER) {
3713             zh->outstanding_sync++;
3714         }
3715         rc = ZOK;
3716     } else {
3717         free(c);
3718         rc = ZINVALIDSTATE;
3719     }
3720     unlock_completion_list(&zh->sent_requests);
3721     return rc;
3722 }
3723 
3724 static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc,
3725         const void *data,watcher_registration_t* wo)
3726 {
3727     return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0, wo, 0);
3728 }
3729 
3730 static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc,
3731         const void *data,watcher_registration_t* wo)
3732 {
3733     return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0, wo, 0);
3734 }
3735 
3736 static int add_strings_completion(zhandle_t *zh, int xid,
3737         strings_completion_t dc, const void *data,watcher_registration_t* wo)
3738 {
3739     return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0, wo, 0);
3740 }
3741 
3742 static int add_strings_stat_completion(zhandle_t *zh, int xid,
3743         strings_stat_completion_t dc, const void *data,watcher_registration_t* wo)
3744 {
3745     return add_completion(zh, xid, COMPLETION_STRINGLIST_STAT, dc, data, 0, wo, 0);
3746 }
3747 
3748 static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc,
3749         const void *data)
3750 {
3751     return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0, 0, 0);
3752 }
3753 
3754 static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
3755         const void *data)
3756 {
3757     return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0, 0, 0);
3758 }
3759 
3760 static int add_string_completion(zhandle_t *zh, int xid,
3761         string_completion_t dc, const void *data)
3762 {
3763     return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0, 0, 0);
3764 }
3765 
3766 static int add_string_stat_completion(zhandle_t *zh, int xid,
3767         string_stat_completion_t dc, const void *data)
3768 {
3769     return add_completion(zh, xid, COMPLETION_STRING_STAT, dc, data, 0, 0, 0);
3770 }
3771 
3772 static int add_multi_completion(zhandle_t *zh, int xid, void_completion_t dc,
3773         const void *data, completion_head_t *clist)
3774 {
3775     return add_completion(zh, xid, COMPLETION_MULTI, dc, data, 0,0, clist);
3776 }
3777 
3778 /**
3779  * After sending the close request, we are waiting for a given millisecs for
3780  * getting the answer and/or for the socket to be closed by the server.
3781  *
3782  * This function should not be called while we still want to process
3783  * any response from the server. It must be called after adaptor_finish called,
3784  * in order not to mess with the I/O receiver thread in multi-threaded mode.
3785  */
3786 int wait_for_session_to_be_closed(zhandle_t *zh, int timeout_ms)
3787 {
3788     int ret = 0;
3789 #ifndef WIN32
3790     struct pollfd fd_s[1];
3791 #else
3792     fd_set rfds;
3793     struct timeval waittime = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
3794 #endif
3795 
3796     if (zh == NULL) {
3797         return ZBADARGUMENTS;
3798     }
3799 
3800 #ifndef WIN32
3801     fd_s[0].fd = zh->fd->sock;
3802     fd_s[0].events = POLLIN;
3803     ret = poll(fd_s, 1, timeout_ms);
3804 #else
3805     FD_ZERO(&rfds);
3806     FD_SET(zh->fd->sock , &rfds);
3807     ret = select(zh->fd->sock + 1, &rfds, NULL, NULL, &waittime);
3808 #endif
3809 
3810     if (ret == 0){
3811         LOG_WARN(LOGCALLBACK(zh), "Timed out (%dms) during waiting for server's reply after sending a close request, sessionId=%#llx\n",
3812             timeout_ms, zh->client_id.client_id);
3813     } else if (ret < 0) {
3814         LOG_WARN(LOGCALLBACK(zh), "System error (%d) happened while waiting for server's reply, sessionId=%#llx\n",
3815             ret, zh->client_id.client_id);
3816     }
3817 
3818     return ZOK;
3819 }
3820 
3821 int zookeeper_close(zhandle_t *zh)
3822 {
3823     int rc=ZOK;
3824     if (zh==0)
3825         return ZBADARGUMENTS;
3826 
3827     zh->close_requested=1;
3828     if (inc_ref_counter(zh,1)>1) {
3829         /* We have incremented the ref counter to prevent the
3830          * completions from calling zookeeper_close before we have
3831          * completed the adaptor_finish call below. */
3832 
3833     /* Signal any syncronous completions before joining the threads */
3834         enter_critical(zh);
3835         free_completions(zh,1,ZCLOSING);
3836         leave_critical(zh);
3837 
3838         adaptor_finish(zh);
3839         /* Now we can allow the handle to be cleaned up, if the completion
3840          * threads finished during the adaptor_finish call. */
3841         api_epilog(zh, 0);
3842         return ZOK;
3843     }
3844     /* No need to decrement the counter since we're just going to
3845      * destroy the handle later. */
3846     if (is_connected(zh)) {
3847         struct oarchive *oa;
3848         struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
3849         LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to %s\n",
3850             zh->client_id.client_id, zoo_get_current_server(zh));
3851         oa = create_buffer_oarchive();
3852         rc = serialize_RequestHeader(oa, "header", &h);
3853         rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa));
3854         /* We queued the buffer, so don't free it */
3855         close_buffer_oarchive(&oa, 0);
3856         if (rc < 0) {
3857             LOG_DEBUG(LOGCALLBACK(zh), "Error during closing zookeeper session, sessionId=%#llx to %s (error: %d)\n",
3858                 zh->client_id.client_id, zoo_get_current_server(zh), rc);
3859             rc = ZMARSHALLINGERROR;
3860         } else {
3861             /* make sure the close request is sent; we set timeout to an arbitrary
3862              * (but reasonable) number of milliseconds since we want the call to block*/
3863             rc = adaptor_send_queue(zh, 3000);
3864 
3865             /* give some time to the server to process the session close request properly */
3866             rc = rc < 0 ? rc : wait_for_session_to_be_closed(zh, 1500);
3867         }
3868     } else {
3869         rc = ZOK;
3870     }
3871 
3872     LOG_INFO(LOGCALLBACK(zh), "Freeing zookeeper resources for sessionId=%#llx\n", zh->client_id.client_id);
3873     destroy(zh);
3874     adaptor_destroy(zh);
3875     free(zh->fd);
3876     free(zh);
3877 #ifdef _WIN32
3878     Win32WSACleanup();
3879 #endif
3880     return rc;
3881 }
3882 
3883 static int isValidPath(const char* path, const int mode) {
3884     int len = 0;
3885     char lastc = '/';
3886     char c;
3887     int i = 0;
3888 
3889   if (path == 0)
3890     return 0;
3891   len = strlen(path);
3892   if (len == 0)
3893     return 0;
3894   if (path[0] != '/')
3895     return 0;
3896   if (len == 1) // done checking - it's the root
3897     return 1;
3898   if (path[len - 1] == '/' && !ZOOKEEPER_IS_SEQUENCE(mode))
3899     return 0;
3900 
3901   i = 1;
3902   for (; i < len; lastc = path[i], i++) {
3903     c = path[i];
3904 
3905     if (c == 0) {
3906       return 0;
3907     } else if (c == '/' && lastc == '/') {
3908       return 0;
3909     } else if (c == '.' && lastc == '.') {
3910       if (path[i-2] == '/' && (((i + 1 == len) && !ZOOKEEPER_IS_SEQUENCE(mode))
3911                                || path[i+1] == '/')) {
3912         return 0;
3913       }
3914     } else if (c == '.') {
3915       if ((path[i-1] == '/') && (((i + 1 == len) && !ZOOKEEPER_IS_SEQUENCE(mode))
3916                                  || path[i+1] == '/')) {
3917         return 0;
3918       }
3919     } else if (c > 0x00 && c < 0x1f) {
3920       return 0;
3921     }
3922   }
3923 
3924   return 1;
3925 }
3926 
3927 /*---------------------------------------------------------------------------*
3928  * REQUEST INIT HELPERS
3929  *---------------------------------------------------------------------------*/
3930 /* Common Request init helper functions to reduce code duplication */
3931 static int Request_path_init(zhandle_t *zh, int mode,
3932         char **path_out, const char *path)
3933 {
3934     assert(path_out);
3935 
3936     *path_out = prepend_string(zh, path);
3937     if (zh == NULL || !isValidPath(*path_out, mode)) {
3938         free_duplicate_path(*path_out, path);
3939         return ZBADARGUMENTS;
3940     }
3941     if (is_unrecoverable(zh)) {
3942         free_duplicate_path(*path_out, path);
3943         return ZINVALIDSTATE;
3944     }
3945 
3946     return ZOK;
3947 }
3948 
3949 static int Request_path_watch_init(zhandle_t *zh, int mode,
3950         char **path_out, const char *path,
3951         int32_t *watch_out, uint32_t watch)
3952 {
3953     int rc = Request_path_init(zh, mode, path_out, path);
3954     if (rc != ZOK) {
3955         return rc;
3956     }
3957     *watch_out = watch;
3958     return ZOK;
3959 }
3960 
3961 /*---------------------------------------------------------------------------*
3962  * ASYNC API
3963  *---------------------------------------------------------------------------*/
3964 int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
3965         const void *data)
3966 {
3967     return zoo_awget(zh,path,watch?zh->watcher:0,zh->context,dc,data);
3968 }
3969 
3970 int zoo_awget(zhandle_t *zh, const char *path,
3971         watcher_fn watcher, void* watcherCtx,
3972         data_completion_t dc, const void *data)
3973 {
3974     struct oarchive *oa;
3975     char *server_path = prepend_string(zh, path);
3976     struct RequestHeader h = {get_xid(), ZOO_GETDATA_OP};
3977     struct GetDataRequest req =  { (char*)server_path, watcher!=0 };
3978     int rc;
3979 
3980     if (zh==0 || !isValidPath(server_path, 0)) {
3981         free_duplicate_path(server_path, path);
3982         return ZBADARGUMENTS;
3983     }
3984     if (is_unrecoverable(zh)) {
3985         free_duplicate_path(server_path, path);
3986         return ZINVALIDSTATE;
3987     }
3988     oa=create_buffer_oarchive();
3989     rc = serialize_RequestHeader(oa, "header", &h);
3990     rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
3991     enter_critical(zh);
3992     rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
3993     create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
3994     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
3995             get_buffer_len(oa));
3996     leave_critical(zh);
3997     free_duplicate_path(server_path, path);
3998     /* We queued the buffer, so don't free it */
3999     close_buffer_oarchive(&oa, 0);
4000 
4001     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4002             zoo_get_current_server(zh));
4003     /* make a best (non-blocking) effort to send the requests asap */
4004     adaptor_send_queue(zh, 0);
4005     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4006 }
4007 
4008 int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc,
4009         const void *data)
4010 {
4011     return zoo_awgetconfig(zh,watch?zh->watcher:0,zh->context,dc,data);
4012 }
4013 
4014 int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
4015         data_completion_t dc, const void *data)
4016 {
4017     struct oarchive *oa;
4018     char *path = ZOO_CONFIG_NODE;
4019     char *server_path = ZOO_CONFIG_NODE;
4020     struct RequestHeader h = { get_xid(), ZOO_GETDATA_OP };
4021     struct GetDataRequest req =  { (char*)server_path, watcher!=0 };
4022     int rc;
4023 
4024     if (zh==0 || !isValidPath(server_path, 0)) {
4025         free_duplicate_path(server_path, path);
4026         return ZBADARGUMENTS;
4027     }
4028     if (is_unrecoverable(zh)) {
4029         free_duplicate_path(server_path, path);
4030         return ZINVALIDSTATE;
4031     }
4032     oa=create_buffer_oarchive();
4033     rc = serialize_RequestHeader(oa, "header", &h);
4034     rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
4035     enter_critical(zh);
4036     rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
4037                                            create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
4038     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4039                                           get_buffer_len(oa));
4040     leave_critical(zh);
4041     free_duplicate_path(server_path, path);
4042     /* We queued the buffer, so don't free it */
4043     close_buffer_oarchive(&oa, 0);
4044 
4045     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4046                zoo_get_current_server(zh));
4047     /* make a best (non-blocking) effort to send the requests asap */
4048     adaptor_send_queue(zh, 0);
4049     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4050 }
4051 
4052 int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
4053        const char *members, int64_t version, data_completion_t dc, const void *data)
4054 {
4055     struct oarchive *oa;
4056     struct RequestHeader h = { get_xid(), ZOO_RECONFIG_OP };
4057     struct ReconfigRequest req;
4058    int rc = 0;
4059 
4060     if (zh==0) {
4061         return ZBADARGUMENTS;
4062     }
4063     if (is_unrecoverable(zh)) {
4064         return ZINVALIDSTATE;
4065     }
4066 
4067    oa=create_buffer_oarchive();
4068    req.joiningServers = (char *)joining;
4069    req.leavingServers = (char *)leaving;
4070    req.newMembers = (char *)members;
4071    req.curConfigId = version;
4072     rc = serialize_RequestHeader(oa, "header", &h);
4073    rc = rc < 0 ? rc : serialize_ReconfigRequest(oa, "req", &req);
4074     enter_critical(zh);
4075     rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data, NULL);
4076     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4077             get_buffer_len(oa));
4078     leave_critical(zh);
4079     /* We queued the buffer, so don't free it */
4080     close_buffer_oarchive(&oa, 0);
4081 
4082     LOG_DEBUG(LOGCALLBACK(zh), "Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh));
4083     /* make a best (non-blocking) effort to send the requests asap */
4084     adaptor_send_queue(zh, 0);
4085 
4086     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4087 }
4088 
4089 static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
4090         const char *path, const char *buffer, int buflen, int version)
4091 {
4092     int rc;
4093     assert(req);
4094     rc = Request_path_init(zh, 0, &req->path, path);
4095     if (rc != ZOK) {
4096         return rc;
4097     }
4098     req->data.buff = (char*)buffer;
4099     req->data.len = buflen;
4100     req->version = version;
4101 
4102     return ZOK;
4103 }
4104 
4105 int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,
4106         int version, stat_completion_t dc, const void *data)
4107 {
4108     struct oarchive *oa;
4109     struct RequestHeader h = {get_xid(), ZOO_SETDATA_OP};
4110     struct SetDataRequest req;
4111     int rc = SetDataRequest_init(zh, &req, path, buffer, buflen, version);
4112     if (rc != ZOK) {
4113         return rc;
4114     }
4115     oa = create_buffer_oarchive();
4116     rc = serialize_RequestHeader(oa, "header", &h);
4117     rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
4118     enter_critical(zh);
4119     rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data,0);
4120     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4121             get_buffer_len(oa));
4122     leave_critical(zh);
4123     free_duplicate_path(req.path, path);
4124     /* We queued the buffer, so don't free it */
4125     close_buffer_oarchive(&oa, 0);
4126 
4127     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4128             zoo_get_current_server(zh));
4129     /* make a best (non-blocking) effort to send the requests asap */
4130     adaptor_send_queue(zh, 0);
4131     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4132 }
4133 
4134 static int CreateRequest_init(zhandle_t *zh, struct CreateRequest *req,
4135         const char *path, const char *value,
4136         int valuelen, const struct ACL_vector *acl_entries, int mode)
4137 {
4138     int rc;
4139     assert(req);
4140     rc = Request_path_init(zh, mode, &req->path, path);
4141     assert(req);
4142     if (rc != ZOK) {
4143         return rc;
4144     }
4145     req->flags = mode;
4146     req->data.buff = (char*)value;
4147     req->data.len = valuelen;
4148     if (acl_entries == 0) {
4149         req->acl.count = 0;
4150         req->acl.data = 0;
4151     } else {
4152         req->acl = *acl_entries;
4153     }
4154 
4155     return ZOK;
4156 }
4157 
4158 static int CreateTTLRequest_init(zhandle_t *zh, struct CreateTTLRequest *req,
4159         const char *path, const char *value,
4160         int valuelen, const struct ACL_vector *acl_entries, int mode, int64_t ttl)
4161 {
4162     int rc;
4163     assert(req);
4164     rc = Request_path_init(zh, mode, &req->path, path);
4165     assert(req);
4166     if (rc != ZOK) {
4167         return rc;
4168     }
4169     req->flags = mode;
4170     req->data.buff = (char*)value;
4171     req->data.len = valuelen;
4172     if (acl_entries == 0) {
4173         req->acl.count = 0;
4174         req->acl.data = 0;
4175     } else {
4176         req->acl = *acl_entries;
4177     }
4178     req->ttl = ttl;
4179 
4180     return ZOK;
4181 }
4182 
4183 static int get_create_op_type(int mode, int default_op) {
4184     if (mode == ZOO_CONTAINER) {
4185         return ZOO_CREATE_CONTAINER_OP;
4186     } else if (ZOOKEEPER_IS_TTL(mode)) {
4187         return ZOO_CREATE_TTL_OP;
4188     } else {
4189         return default_op;
4190     }
4191 }
4192 
4193 int zoo_acreate(zhandle_t *zh, const char *path, const char *value,
4194         int valuelen, const struct ACL_vector *acl_entries, int mode,
4195         string_completion_t completion, const void *data)
4196 {
4197     return zoo_acreate_ttl(zh, path, value, valuelen, acl_entries, mode, -1, completion, data);
4198 }
4199 
4200 int zoo_acreate_ttl(zhandle_t *zh, const char *path, const char *value,
4201         int valuelen, const struct ACL_vector *acl_entries, int mode, int64_t ttl,
4202         string_completion_t completion, const void *data)
4203 {
4204     struct oarchive *oa;
4205     struct RequestHeader h = {get_xid(), get_create_op_type(mode, ZOO_CREATE_OP)};
4206     int rc;
4207     char *req_path;
4208 
4209     if (ZOOKEEPER_IS_TTL(mode)) {
4210         struct CreateTTLRequest req;
4211 
4212         if (ttl <= 0 || ttl > ZOO_MAX_TTL) {
4213             return ZBADARGUMENTS;
4214         }
4215 
4216         rc = CreateTTLRequest_init(zh, &req,
4217                 path, value, valuelen, acl_entries, mode, ttl);
4218         if (rc != ZOK) {
4219             return rc;
4220         }
4221         oa = create_buffer_oarchive();
4222         rc = serialize_RequestHeader(oa, "header", &h);
4223         rc = rc < 0 ? rc : serialize_CreateTTLRequest(oa, "req", &req);
4224 
4225         req_path = req.path;
4226     } else {
4227         struct CreateRequest req;
4228 
4229         if (ttl >= 0) {
4230             return ZBADARGUMENTS;
4231         }
4232 
4233         rc = CreateRequest_init(zh, &req,
4234                 path, value, valuelen, acl_entries, mode);
4235         if (rc != ZOK) {
4236             return rc;
4237         }
4238         oa = create_buffer_oarchive();
4239         rc = serialize_RequestHeader(oa, "header", &h);
4240         rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
4241 
4242         req_path = req.path;
4243     }
4244 
4245     enter_critical(zh);
4246     rc = rc < 0 ? rc : add_string_completion(zh, h.xid, completion, data);
4247     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4248             get_buffer_len(oa));
4249     leave_critical(zh);
4250     free_duplicate_path(req_path, path);
4251     /* We queued the buffer, so don't free it */
4252     close_buffer_oarchive(&oa, 0);
4253 
4254     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4255             zoo_get_current_server(zh));
4256     /* make a best (non-blocking) effort to send the requests asap */
4257     adaptor_send_queue(zh, 0);
4258     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4259 }
4260 
4261 int zoo_acreate2(zhandle_t *zh, const char *path, const char *value,
4262         int valuelen, const struct ACL_vector *acl_entries, int mode,
4263         string_stat_completion_t completion, const void *data)
4264 {
4265     return zoo_acreate2_ttl(zh, path, value, valuelen, acl_entries, mode, -1, completion, data);
4266 }
4267 
4268 int zoo_acreate2_ttl(zhandle_t *zh, const char *path, const char *value,
4269         int valuelen, const struct ACL_vector *acl_entries, int mode, int64_t ttl,
4270         string_stat_completion_t completion, const void *data)
4271 {
4272     struct oarchive *oa;
4273     struct RequestHeader h = { get_xid(), get_create_op_type(mode, ZOO_CREATE2_OP) };
4274     int rc;
4275     char *req_path;
4276 
4277     if (ZOOKEEPER_IS_TTL(mode)) {
4278         struct CreateTTLRequest req;
4279 
4280         if (ttl <= 0 || ttl > ZOO_MAX_TTL) {
4281             return ZBADARGUMENTS;
4282         }
4283 
4284         rc = CreateTTLRequest_init(zh, &req,
4285                 path, value, valuelen, acl_entries, mode, ttl);
4286         if (rc != ZOK) {
4287             return rc;
4288         }
4289         oa = create_buffer_oarchive();
4290         rc = serialize_RequestHeader(oa, "header", &h);
4291         rc = rc < 0 ? rc : serialize_CreateTTLRequest(oa, "req", &req);
4292 
4293         req_path = req.path;
4294     } else {
4295         struct CreateRequest req;
4296 
4297         if (ttl >= 0) {
4298             return ZBADARGUMENTS;
4299         }
4300 
4301         rc = CreateRequest_init(zh, &req, path, value, valuelen, acl_entries, mode);
4302         if (rc != ZOK) {
4303             return rc;
4304         }
4305         oa = create_buffer_oarchive();
4306         rc = serialize_RequestHeader(oa, "header", &h);
4307         rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
4308 
4309         req_path = req.path;
4310     }
4311 
4312     enter_critical(zh);
4313     rc = rc < 0 ? rc : add_string_stat_completion(zh, h.xid, completion, data);
4314     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4315             get_buffer_len(oa));
4316     leave_critical(zh);
4317     free_duplicate_path(req_path, path);
4318     /* We queued the buffer, so don't free it */
4319     close_buffer_oarchive(&oa, 0);
4320 
4321     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4322             zoo_get_current_server(zh));
4323     /* make a best (non-blocking) effort to send the requests asap */
4324     adaptor_send_queue(zh, 0);
4325     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4326 }
4327 
4328 int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
4329         const char *path, int version)
4330 {
4331     int rc = Request_path_init(zh, 0, &req->path, path);
4332     if (rc != ZOK) {
4333         return rc;
4334     }
4335     req->version = version;
4336     return ZOK;
4337 }
4338 
4339 int zoo_adelete(zhandle_t *zh, const char *path, int version,
4340         void_completion_t completion, const void *data)
4341 {
4342     struct oarchive *oa;
4343     struct RequestHeader h = {get_xid(), ZOO_DELETE_OP};
4344     struct DeleteRequest req;
4345     int rc = DeleteRequest_init(zh, &req, path, version);
4346     if (rc != ZOK) {
4347         return rc;
4348     }
4349     oa = create_buffer_oarchive();
4350     rc = serialize_RequestHeader(oa, "header", &h);
4351     rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
4352     enter_critical(zh);
4353     rc = rc < 0 ? rc : add_void_completion(zh, h.xid, completion, data);
4354     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4355             get_buffer_len(oa));
4356     leave_critical(zh);
4357     free_duplicate_path(req.path, path);
4358     /* We queued the buffer, so don't free it */
4359     close_buffer_oarchive(&oa, 0);
4360 
4361     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4362             zoo_get_current_server(zh));
4363     /* make a best (non-blocking) effort to send the requests asap */
4364     adaptor_send_queue(zh, 0);
4365     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4366 }
4367 
4368 int zoo_aexists(zhandle_t *zh, const char *path, int watch,
4369         stat_completion_t sc, const void *data)
4370 {
4371     return zoo_awexists(zh,path,watch?zh->watcher:0,zh->context,sc,data);
4372 }
4373 
4374 int zoo_awexists(zhandle_t *zh, const char *path,
4375         watcher_fn watcher, void* watcherCtx,
4376         stat_completion_t completion, const void *data)
4377 {
4378     struct oarchive *oa;
4379     struct RequestHeader h = {get_xid(), ZOO_EXISTS_OP};
4380     struct ExistsRequest req;
4381     int rc = Request_path_watch_init(zh, 0, &req.path, path,
4382             &req.watch, watcher != NULL);
4383     if (rc != ZOK) {
4384         return rc;
4385     }
4386     oa = create_buffer_oarchive();
4387     rc = serialize_RequestHeader(oa, "header", &h);
4388     rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req);
4389     enter_critical(zh);
4390     rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data,
4391         create_watcher_registration(req.path,exists_result_checker,
4392                 watcher,watcherCtx));
4393     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4394             get_buffer_len(oa));
4395     leave_critical(zh);
4396     free_duplicate_path(req.path, path);
4397     /* We queued the buffer, so don't free it */
4398     close_buffer_oarchive(&oa, 0);
4399 
4400     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4401             zoo_get_current_server(zh));
4402     /* make a best (non-blocking) effort to send the requests asap */
4403     adaptor_send_queue(zh, 0);
4404     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4405 }
4406 
4407 static int zoo_awget_children_(zhandle_t *zh, const char *path,
4408          watcher_fn watcher, void* watcherCtx,
4409          strings_completion_t sc,
4410          const void *data)
4411 {
4412     struct oarchive *oa;
4413     struct RequestHeader h = {get_xid(), ZOO_GETCHILDREN_OP};
4414     struct GetChildrenRequest req ;
4415     int rc = Request_path_watch_init(zh, 0, &req.path, path,
4416             &req.watch, watcher != NULL);
4417     if (rc != ZOK) {
4418         return rc;
4419     }
4420     oa = create_buffer_oarchive();
4421     rc = serialize_RequestHeader(oa, "header", &h);
4422     rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req);
4423     enter_critical(zh);
4424     rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, sc, data,
4425             create_watcher_registration(req.path,child_result_checker,watcher,watcherCtx));
4426     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4427             get_buffer_len(oa));
4428     leave_critical(zh);
4429     free_duplicate_path(req.path, path);
4430     /* We queued the buffer, so don't free it */
4431     close_buffer_oarchive(&oa, 0);
4432 
4433     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4434             zoo_get_current_server(zh));
4435     /* make a best (non-blocking) effort to send the requests asap */
4436     adaptor_send_queue(zh, 0);
4437     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4438 }
4439 
4440 int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
4441         strings_completion_t dc, const void *data)
4442 {
4443     return zoo_awget_children_(zh,path,watch?zh->watcher:0,zh->context,dc,data);
4444 }
4445 
4446 int zoo_awget_children(zhandle_t *zh, const char *path,
4447          watcher_fn watcher, void* watcherCtx,
4448          strings_completion_t dc,
4449          const void *data)
4450 {
4451     return zoo_awget_children_(zh,path,watcher,watcherCtx,dc,data);
4452 }
4453 
4454 static int zoo_awget_children2_(zhandle_t *zh, const char *path,
4455          watcher_fn watcher, void* watcherCtx,
4456          strings_stat_completion_t ssc,
4457          const void *data)
4458 {
4459     /* invariant: (sc == NULL) != (sc == NULL) */
4460     struct oarchive *oa;
4461     struct RequestHeader h = {get_xid(), ZOO_GETCHILDREN2_OP};
4462     struct GetChildren2Request req ;
4463     int rc = Request_path_watch_init(zh, 0, &req.path, path,
4464             &req.watch, watcher != NULL);
4465     if (rc != ZOK) {
4466         return rc;
4467     }
4468     oa = create_buffer_oarchive();
4469     rc = serialize_RequestHeader(oa, "header", &h);
4470     rc = rc < 0 ? rc : serialize_GetChildren2Request(oa, "req", &req);
4471     enter_critical(zh);
4472     rc = rc < 0 ? rc : add_strings_stat_completion(zh, h.xid, ssc, data,
4473             create_watcher_registration(req.path,child_result_checker,watcher,watcherCtx));
4474     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4475             get_buffer_len(oa));
4476     leave_critical(zh);
4477     free_duplicate_path(req.path, path);
4478     /* We queued the buffer, so don't free it */
4479     close_buffer_oarchive(&oa, 0);
4480 
4481     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4482             zoo_get_current_server(zh));
4483     /* make a best (non-blocking) effort to send the requests asap */
4484     adaptor_send_queue(zh, 0);
4485     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4486 }
4487 
4488 int zoo_aget_children2(zhandle_t *zh, const char *path, int watch,
4489         strings_stat_completion_t dc, const void *data)
4490 {
4491     return zoo_awget_children2_(zh,path,watch?zh->watcher:0,zh->context,dc,data);
4492 }
4493 
4494 int zoo_awget_children2(zhandle_t *zh, const char *path,
4495          watcher_fn watcher, void* watcherCtx,
4496          strings_stat_completion_t dc,
4497          const void *data)
4498 {
4499     return zoo_awget_children2_(zh,path,watcher,watcherCtx,dc,data);
4500 }
4501 
4502 int zoo_async(zhandle_t *zh, const char *path,
4503         string_completion_t completion, const void *data)
4504 {
4505     struct oarchive *oa;
4506     struct RequestHeader h = {get_xid(), ZOO_SYNC_OP};
4507     struct SyncRequest req;
4508     int rc = Request_path_init(zh, 0, &req.path, path);
4509     if (rc != ZOK) {
4510         return rc;
4511     }
4512     oa = create_buffer_oarchive();
4513     rc = serialize_RequestHeader(oa, "header", &h);
4514     rc = rc < 0 ? rc : serialize_SyncRequest(oa, "req", &req);
4515     enter_critical(zh);
4516     rc = rc < 0 ? rc : add_string_completion(zh, h.xid, completion, data);
4517     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4518             get_buffer_len(oa));
4519     leave_critical(zh);
4520     free_duplicate_path(req.path, path);
4521     /* We queued the buffer, so don't free it */
4522     close_buffer_oarchive(&oa, 0);
4523 
4524     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4525             zoo_get_current_server(zh));
4526     /* make a best (non-blocking) effort to send the requests asap */
4527     adaptor_send_queue(zh, 0);
4528     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4529 }
4530 
4531 
4532 int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,
4533         const void *data)
4534 {
4535     struct oarchive *oa;
4536     struct RequestHeader h = {get_xid(), ZOO_GETACL_OP};
4537     struct GetACLRequest req;
4538     int rc = Request_path_init(zh, 0, &req.path, path) ;
4539     if (rc != ZOK) {
4540         return rc;
4541     }
4542     oa = create_buffer_oarchive();
4543     rc = serialize_RequestHeader(oa, "header", &h);
4544     rc = rc < 0 ? rc : serialize_GetACLRequest(oa, "req", &req);
4545     enter_critical(zh);
4546     rc = rc < 0 ? rc : add_acl_completion(zh, h.xid, completion, data);
4547     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4548             get_buffer_len(oa));
4549     leave_critical(zh);
4550     free_duplicate_path(req.path, path);
4551     /* We queued the buffer, so don't free it */
4552     close_buffer_oarchive(&oa, 0);
4553 
4554     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4555             zoo_get_current_server(zh));
4556     /* make a best (non-blocking) effort to send the requests asap */
4557     adaptor_send_queue(zh, 0);
4558     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4559 }
4560 
4561 int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
4562         struct ACL_vector *acl, void_completion_t completion, const void *data)
4563 {
4564     struct oarchive *oa;
4565     struct RequestHeader h = {get_xid(), ZOO_SETACL_OP};
4566     struct SetACLRequest req;
4567     int rc = Request_path_init(zh, 0, &req.path, path);
4568     if (rc != ZOK) {
4569         return rc;
4570     }
4571     oa = create_buffer_oarchive();
4572     req.acl = *acl;
4573     req.version = version;
4574     rc = serialize_RequestHeader(oa, "header", &h);
4575     rc = rc < 0 ? rc : serialize_SetACLRequest(oa, "req", &req);
4576     enter_critical(zh);
4577     rc = rc < 0 ? rc : add_void_completion(zh, h.xid, completion, data);
4578     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4579             get_buffer_len(oa));
4580     leave_critical(zh);
4581     free_duplicate_path(req.path, path);
4582     /* We queued the buffer, so don't free it */
4583     close_buffer_oarchive(&oa, 0);
4584 
4585     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
4586             zoo_get_current_server(zh));
4587     /* make a best (non-blocking) effort to send the requests asap */
4588     adaptor_send_queue(zh, 0);
4589     return (rc < 0)?ZMARSHALLINGERROR:ZOK;
4590 }
4591 
4592 /* Completions for multi-op results */
4593 static void op_result_string_completion(int err, const char *value, const void *data)
4594 {
4595     struct zoo_op_result *result = (struct zoo_op_result *)data;
4596     assert(result);
4597     result->err = err;
4598 
4599     if (result->value && value) {
4600         int len = strlen(value) + 1;
4601         if (len > result->valuelen) {
4602             len = result->valuelen;
4603         }
4604         if (len > 0) {
4605             memcpy(result->value, value, len - 1);
4606             result->value[len - 1] = '\0';
4607         }
4608     } else {
4609         result->value = NULL;
4610     }
4611 }
4612 
4613 static void op_result_void_completion(int err, const void *data)
4614 {
4615     struct zoo_op_result *result = (struct zoo_op_result *)data;
4616     assert(result);
4617     result->err = err;
4618 }
4619 
4620 static void op_result_stat_completion(int err, const struct Stat *stat, const void *data)
4621 {
4622     struct zoo_op_result *result = (struct zoo_op_result *)data;
4623     assert(result);
4624     result->err = err;
4625 
4626     if (result->stat && err == 0 && stat) {
4627         *result->stat = *stat;
4628     } else {
4629         result->stat = NULL ;
4630     }
4631 }
4632 
4633 static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *req,
4634         const char *path, int version)
4635 {
4636     int rc ;
4637     assert(req);
4638     rc = Request_path_init(zh, 0, &req->path, path);
4639     if (rc != ZOK) {
4640         return rc;
4641     }
4642     req->version = version;
4643 
4644     return ZOK;
4645 }
4646 
4647 int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
4648         zoo_op_result_t *results, void_completion_t completion, const void *data)
4649 {
4650     struct RequestHeader h = {get_xid(), ZOO_MULTI_OP};
4651     struct MultiHeader mh = {-1, 1, -1};
4652     struct oarchive *oa = create_buffer_oarchive();
4653     completion_head_t clist = { 0 };
4654 
4655     int rc = serialize_RequestHeader(oa, "header", &h);
4656 
4657     int index = 0;
4658     for (index=0; index < count; index++) {
4659         const zoo_op_t *op = ops+index;
4660         zoo_op_result_t *result = results+index;
4661         completion_list_t *entry = NULL;
4662 
4663         struct MultiHeader mh = {op->type, 0, -1};
4664         rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
4665 
4666         switch(op->type) {
4667             case ZOO_CREATE_CONTAINER_OP:
4668             case ZOO_CREATE_OP: {
4669                 struct CreateRequest req;
4670 
4671                 rc = rc < 0 ? rc : CreateRequest_init(zh, &req,
4672                                         op->create_op.path, op->create_op.data,
4673                                         op->create_op.datalen, op->create_op.acl,
4674                                         op->create_op.flags);
4675                 rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
4676                 result->value = op->create_op.buf;
4677                 result->valuelen = op->create_op.buflen;
4678 
4679                 enter_critical(zh);
4680                 entry = create_completion_entry(zh, h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
4681                 leave_critical(zh);
4682                 free_duplicate_path(req.path, op->create_op.path);
4683                 break;
4684             }
4685 
4686             case ZOO_DELETE_OP: {
4687                 struct DeleteRequest req;
4688                 rc = rc < 0 ? rc : DeleteRequest_init(zh, &req, op->delete_op.path, op->delete_op.version);
4689                 rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
4690 
4691                 enter_critical(zh);
4692                 entry = create_completion_entry(zh, h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
4693                 leave_critical(zh);
4694                 free_duplicate_path(req.path, op->delete_op.path);
4695                 break;
4696             }
4697 
4698             case ZOO_SETDATA_OP: {
4699                 struct SetDataRequest req;
4700                 rc = rc < 0 ? rc : SetDataRequest_init(zh, &req,
4701                                         op->set_op.path, op->set_op.data,
4702                                         op->set_op.datalen, op->set_op.version);
4703                 rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
4704                 result->stat = op->set_op.stat;
4705 
4706                 enter_critical(zh);
4707                 entry = create_completion_entry(zh, h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
4708                 leave_critical(zh);
4709                 free_duplicate_path(req.path, op->set_op.path);
4710                 break;
4711             }
4712 
4713             case ZOO_CHECK_OP: {
4714                 struct CheckVersionRequest req;
4715                 rc = rc < 0 ? rc : CheckVersionRequest_init(zh, &req,
4716                                         op->check_op.path, op->check_op.version);
4717                 rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
4718 
4719                 enter_critical(zh);
4720                 entry = create_completion_entry(zh, h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
4721                 leave_critical(zh);
4722                 free_duplicate_path(req.path, op->check_op.path);
4723                 break;
4724             }
4725 
4726             default:
4727                 LOG_ERROR(LOGCALLBACK(zh), "Unimplemented sub-op type=%d in multi-op", op->type);
4728                 return ZUNIMPLEMENTED;
4729         }
4730 
4731         queue_completion(&clist, entry, 0);
4732     }
4733 
4734     rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
4735 
4736     /* BEGIN: CRTICIAL SECTION */
4737     enter_critical(zh);
4738     rc = rc < 0 ? rc : add_multi_completion(zh, h.xid, completion, data, &clist);
4739     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4740             get_buffer_len(oa));
4741     leave_critical(zh);
4742 
4743     /* We queued the buffer, so don't free it */
4744     close_buffer_oarchive(&oa, 0);
4745 
4746     LOG_DEBUG(LOGCALLBACK(zh), "Sending multi request xid=%#x with %d subrequests to %s",
4747             h.xid, index, zoo_get_current_server(zh));
4748     /* make a best (non-blocking) effort to send the requests asap */
4749     adaptor_send_queue(zh, 0);
4750 
4751     return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
4752 }
4753 
4754 typedef union WatchesRequest WatchesRequest;
4755 
4756 union WatchesRequest {
4757     struct CheckWatchesRequest check;
4758     struct RemoveWatchesRequest remove;
4759 };
4760 
4761 static int aremove_watches(
4762         zhandle_t *zh, const char *path, ZooWatcherType wtype,
4763         watcher_fn watcher, void *watcherCtx, int local,
4764         void_completion_t *completion, const void *data, int all)
4765 {
4766     char *server_path = prepend_string(zh, path);
4767     int rc;
4768     struct oarchive *oa;
4769     struct RequestHeader h = {
4770         get_xid(),
4771         all ? ZOO_REMOVE_WATCHES : ZOO_CHECK_WATCHES
4772     };
4773     WatchesRequest req;
4774     watcher_deregistration_t *wdo;
4775 
4776     if (!zh || !isValidPath(server_path, 0)) {
4777         rc = ZBADARGUMENTS;
4778         goto done;
4779     }
4780 
4781     if (!local && is_unrecoverable(zh)) {
4782         rc = ZINVALIDSTATE;
4783         goto done;
4784     }
4785 
4786     lock_watchers(zh);
4787     if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
4788         rc = ZNOWATCHER;
4789         unlock_watchers(zh);
4790         goto done;
4791     }
4792 
4793     if (local) {
4794         removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
4795         unlock_watchers(zh);
4796 #ifdef THREADED
4797         notify_sync_completion((struct sync_completion *)data);
4798 #endif
4799         rc = ZOK;
4800         goto done;
4801     }
4802     unlock_watchers(zh);
4803 
4804     oa = create_buffer_oarchive();
4805     rc = serialize_RequestHeader(oa, "header", &h);
4806 
4807     if (all) {
4808        req.remove.path = (char*)server_path;
4809        req.remove.type = wtype;
4810        rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req.remove);
4811     } else {
4812         req.check.path = (char*)server_path;
4813         req.check.type = wtype;
4814         rc = rc < 0 ? rc : serialize_CheckWatchesRequest(oa, "req", &req.check);
4815     }
4816 
4817     if (rc < 0) {
4818         goto done;
4819     }
4820 
4821     wdo = create_watcher_deregistration(
4822         server_path, watcher, watcherCtx, wtype);
4823 
4824     if (!wdo) {
4825         rc = ZSYSTEMERROR;
4826         goto done;
4827     }
4828 
4829     enter_critical(zh);
4830     rc = add_completion_deregistration(
4831         zh, h.xid, COMPLETION_VOID, completion, data, 0, wdo, 0);
4832     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
4833             get_buffer_len(oa));
4834     rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
4835     leave_critical(zh);
4836 
4837     /* We queued the buffer, so don't free it */
4838     close_buffer_oarchive(&oa, 0);
4839 
4840     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
4841               h.xid, path, zoo_get_current_server(zh));
4842 
4843     adaptor_send_queue(zh, 0);
4844 
4845 done:
4846     free_duplicate_path(server_path, path);
4847     return rc;
4848 }
4849 void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
4850         int valuelen,  const struct ACL_vector *acl, int mode,
4851         char *path_buffer, int path_buffer_len)
4852 {
4853     assert(op);
4854     op->type = get_create_op_type(mode, ZOO_CREATE_OP);
4855     op->create_op.path = path;
4856     op->create_op.data = value;
4857     op->create_op.datalen = valuelen;
4858     op->create_op.acl = acl;
4859     op->create_op.flags = mode;
4860     op->create_op.ttl = 0;
4861     op->create_op.buf = path_buffer;
4862     op->create_op.buflen = path_buffer_len;
4863 }
4864 
4865 void zoo_create2_op_init(zoo_op_t *op, const char *path, const char *value,
4866         int valuelen,  const struct ACL_vector *acl, int mode,
4867         char *path_buffer, int path_buffer_len)
4868 {
4869     assert(op);
4870     op->type = get_create_op_type(mode, ZOO_CREATE2_OP);
4871     op->create_op.path = path;
4872     op->create_op.data = value;
4873     op->create_op.datalen = valuelen;
4874     op->create_op.acl = acl;
4875     op->create_op.flags = mode;
4876     op->create_op.buf = path_buffer;
4877     op->create_op.buflen = path_buffer_len;
4878 }
4879 
4880 void zoo_delete_op_init(zoo_op_t *op, const char *path, int version)
4881 {
4882     assert(op);
4883     op->type = ZOO_DELETE_OP;
4884     op->delete_op.path = path;
4885     op->delete_op.version = version;
4886 }
4887 
4888 void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer,
4889         int buflen, int version, struct Stat *stat)
4890 {
4891     assert(op);
4892     op->type = ZOO_SETDATA_OP;
4893     op->set_op.path = path;
4894     op->set_op.data = buffer;
4895     op->set_op.datalen = buflen;
4896     op->set_op.version = version;
4897     op->set_op.stat = stat;
4898 }
4899 
4900 void zoo_check_op_init(zoo_op_t *op, const char *path, int version)
4901 {
4902     assert(op);
4903     op->type = ZOO_CHECK_OP;
4904     op->check_op.path = path;
4905     op->check_op.version = version;
4906 }
4907 
4908 /* specify timeout of 0 to make the function non-blocking */
4909 /* timeout is in milliseconds */
4910 int flush_send_queue(zhandle_t*zh, int timeout)
4911 {
4912     int rc= ZOK;
4913     struct timeval started;
4914 #ifdef _WIN32
4915     fd_set pollSet;
4916     struct timeval wait;
4917 #endif
4918     get_system_time(&started);
4919     // we can't use dequeue_buffer() here because if (non-blocking) send_buffer()
4920     // returns EWOULDBLOCK we'd have to put the buffer back on the queue.
4921     // we use a recursive lock instead and only dequeue the buffer if a send was
4922     // successful
4923     lock_buffer_list(&zh->to_send);
4924     while (zh->to_send.head != 0 && (is_connected(zh) || is_sasl_auth_in_progress(zh))) {
4925         if (is_sasl_auth_in_progress(zh)) {
4926             // We don't let non-SASL packets escape as long as
4927             // negotiation is not complete.  (SASL packets are always
4928             // pushed to the front of the queue.)
4929             buffer_list_t *buff = zh->to_send.head;
4930             int32_t type;
4931 
4932             rc = extract_request_type(buff->buffer, buff->len, &type);
4933 
4934             if (rc < 0 || type != ZOO_SASL_OP) {
4935                 break;
4936             }
4937         }
4938 
4939         if(timeout!=0){
4940 #ifndef _WIN32
4941             struct pollfd fds;
4942 #endif
4943             int elapsed;
4944             struct timeval now;
4945             get_system_time(&now);
4946             elapsed=calculate_interval(&started,&now);
4947             if (elapsed>timeout) {
4948                 rc = ZOPERATIONTIMEOUT;
4949                 break;
4950             }
4951 
4952 #ifdef _WIN32
4953             wait = get_timeval(timeout-elapsed);
4954             FD_ZERO(&pollSet);
4955             FD_SET(zh->fd->sock, &pollSet);
4956             // Poll the socket
4957             rc = select((int)(zh->fd->sock)+1, NULL,  &pollSet, NULL, &wait);
4958 #else
4959             fds.fd = zh->fd->sock;
4960             fds.events = POLLOUT;
4961             fds.revents = 0;
4962             rc = poll(&fds, 1, timeout-elapsed);
4963 #endif
4964             if (rc<=0) {
4965                 /* timed out or an error or POLLERR */
4966                 rc = rc==0 ? ZOPERATIONTIMEOUT : ZSYSTEMERROR;
4967                 break;
4968             }
4969         }
4970 
4971         rc = send_buffer(zh, zh->to_send.head);
4972         if(rc==0 && timeout==0){
4973             /* send_buffer would block while sending this buffer */
4974             rc = ZOK;
4975             break;
4976         }
4977         if (rc < 0) {
4978             rc = ZCONNECTIONLOSS;
4979             break;
4980         }
4981         // if the buffer has been sent successfully, remove it from the queue
4982         if (rc > 0)
4983             remove_buffer(&zh->to_send);
4984         get_system_time(&zh->last_send);
4985         rc = ZOK;
4986     }
4987     unlock_buffer_list(&zh->to_send);
4988     return rc;
4989 }
4990 
4991 const char* zerror(int c)
4992 {
4993     switch (c){
4994     case ZOK:
4995       return "ok";
4996     case ZSYSTEMERROR:
4997       return "system error";
4998     case ZRUNTIMEINCONSISTENCY:
4999       return "run time inconsistency";
5000     case ZDATAINCONSISTENCY:
5001       return "data inconsistency";
5002     case ZCONNECTIONLOSS:
5003       return "connection loss";
5004     case ZMARSHALLINGERROR:
5005       return "marshalling error";
5006     case ZUNIMPLEMENTED:
5007       return "unimplemented";
5008     case ZOPERATIONTIMEOUT:
5009       return "operation timeout";
5010     case ZBADARGUMENTS:
5011       return "bad arguments";
5012     case ZINVALIDSTATE:
5013       return "invalid zhandle state";
5014     case ZNEWCONFIGNOQUORUM:
5015       return "no quorum of new config is connected and up-to-date with the leader of last commmitted config - try invoking reconfiguration after new servers are connected and synced";
5016     case ZRECONFIGINPROGRESS:
5017       return "Another reconfiguration is in progress -- concurrent reconfigs not supported (yet)";
5018     case ZAPIERROR:
5019       return "api error";
5020     case ZNONODE:
5021       return "no node";
5022     case ZNOAUTH:
5023       return "not authenticated";
5024     case ZBADVERSION:
5025       return "bad version";
5026     case  ZNOCHILDRENFOREPHEMERALS:
5027       return "no children for ephemerals";
5028     case ZNODEEXISTS:
5029       return "node exists";
5030     case ZNOTEMPTY:
5031       return "not empty";
5032     case ZSESSIONEXPIRED:
5033       return "session expired";
5034     case ZINVALIDCALLBACK:
5035       return "invalid callback";
5036     case ZINVALIDACL:
5037       return "invalid acl";
5038     case ZAUTHFAILED:
5039       return "authentication failed";
5040     case ZCLOSING:
5041       return "zookeeper is closing";
5042     case ZNOTHING:
5043       return "(not error) no server responses to process";
5044     case ZSESSIONMOVED:
5045       return "session moved to another server, so operation is ignored";
5046     case ZNOTREADONLY:
5047       return "state-changing request is passed to read-only server";
5048     case ZEPHEMERALONLOCALSESSION:
5049       return "attempt to create ephemeral node on a local session";
5050     case ZNOWATCHER:
5051       return "the watcher couldn't be found";
5052     case ZRECONFIGDISABLED:
5053       return "attempts to perform a reconfiguration operation when reconfiguration feature is disable";
5054     case ZSESSIONCLOSEDREQUIRESASLAUTH:
5055       return "session closed by server because client is required to do SASL authentication";
5056     case ZTHROTTLEDOP:
5057       return "Operation was throttled due to high load";
5058     }
5059     if (c > 0) {
5060       return strerror(c);
5061     }
5062     return "unknown error";
5063 }
5064 
5065 int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert,
5066         int certLen,void_completion_t completion, const void *data)
5067 {
5068     struct buffer auth;
5069     auth_info *authinfo;
5070     if(scheme==NULL || zh==NULL)
5071         return ZBADARGUMENTS;
5072 
5073     if (is_unrecoverable(zh))
5074         return ZINVALIDSTATE;
5075 
5076     // [ZOOKEEPER-800] zoo_add_auth should return ZINVALIDSTATE if
5077     // the connection is closed.
5078     if (zoo_state(zh) == 0) {
5079         return ZINVALIDSTATE;
5080     }
5081 
5082     if(cert!=NULL && certLen!=0){
5083         auth.buff=calloc(1,certLen);
5084         if(auth.buff==0) {
5085             return ZSYSTEMERROR;
5086         }
5087         memcpy(auth.buff,cert,certLen);
5088         auth.len=certLen;
5089     } else {
5090         auth.buff = 0;
5091         auth.len = 0;
5092     }
5093 
5094     zoo_lock_auth(zh);
5095     authinfo = (auth_info*) malloc(sizeof(auth_info));
5096     authinfo->scheme=strdup(scheme);
5097     authinfo->auth=auth;
5098     authinfo->completion=completion;
5099     authinfo->data=data;
5100     authinfo->next = NULL;
5101     add_last_auth(&zh->auth_h, authinfo);
5102     zoo_unlock_auth(zh);
5103 
5104     if (is_connected(zh) ||
5105         // When associating, only send info packets if no SASL
5106         // negotiation is planned.  (Such packets would be queued in
5107         // front of SASL packets, which is forbidden, and SASL
5108         // completion is followed by a 'send_auth_info' anyway.)
5109         (zh->state == ZOO_ASSOCIATING_STATE && !has_sasl_client(zh))) {
5110         return send_last_auth_info(zh);
5111     }
5112 
5113     return ZOK;
5114 }
5115 
5116 static const char* format_endpoint_info(const struct sockaddr_storage* ep)
5117 {
5118     static char buf[134] = { 0 };
5119     char addrstr[INET6_ADDRSTRLEN] = { 0 };
5120     const char *fmtstring;
5121     void *inaddr;
5122     char is_inet6 = 0;  // poor man's boolean
5123 #ifdef _WIN32
5124     char * addrstring;
5125 #endif
5126     int port;
5127     if(ep==0)
5128         return "null";
5129 
5130 #if defined(AF_INET6)
5131     if(ep->ss_family==AF_INET6){
5132         inaddr=&((struct sockaddr_in6*)ep)->sin6_addr;
5133         port=((struct sockaddr_in6*)ep)->sin6_port;
5134         is_inet6 = 1;
5135     } else {
5136 #endif
5137         inaddr=&((struct sockaddr_in*)ep)->sin_addr;
5138         port=((struct sockaddr_in*)ep)->sin_port;
5139 #if defined(AF_INET6)
5140     }
5141 #endif
5142     fmtstring = (is_inet6 ? "[%s]:%d" : "%s:%d");
5143 #ifdef _WIN32
5144     addrstring = inet_ntoa (*(struct in_addr*)inaddr);
5145     sprintf(buf,fmtstring,addrstring,ntohs(port));
5146 #else
5147     inet_ntop(ep->ss_family,inaddr,addrstr,sizeof(addrstr)-1);
5148     sprintf(buf,fmtstring,addrstr,ntohs(port));
5149 #endif
5150     return buf;
5151 }
5152 
5153 log_callback_fn zoo_get_log_callback(const zhandle_t* zh)
5154 {
5155     // Verify we have a valid handle
5156     if (zh == NULL) {
5157         return NULL;
5158     }
5159 
5160     return zh->log_callback;
5161 }
5162 
5163 void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback)
5164 {
5165     // Verify we have a valid handle
5166     if (zh == NULL) {
5167         return;
5168     }
5169 
5170     zh->log_callback = callback;
5171 }
5172 
5173 void zoo_deterministic_conn_order(int yesOrNo)
5174 {
5175     disable_conn_permute=yesOrNo;
5176 }
5177 
5178 #ifdef THREADED
5179 
5180 static void process_sync_completion(zhandle_t *zh,
5181         completion_list_t *cptr,
5182         struct sync_completion *sc,
5183         struct iarchive *ia)
5184 {
5185     LOG_DEBUG(LOGCALLBACK(zh), "Processing sync_completion with type=%d xid=%#x rc=%d",
5186             cptr->c.type, cptr->xid, sc->rc);
5187 
5188     switch(cptr->c.type) {
5189     case COMPLETION_DATA:
5190         if (sc->rc==0) {
5191             struct GetDataResponse res;
5192             int len;
5193             deserialize_GetDataResponse(ia, "reply", &res);
5194             if (res.data.len <= sc->u.data.buff_len) {
5195                 len = res.data.len;
5196             } else {
5197                 len = sc->u.data.buff_len;
5198             }
5199             sc->u.data.buff_len = len;
5200             // check if len is negative
5201             // just of NULL which is -1 int
5202             if (len == -1) {
5203                 sc->u.data.buffer = NULL;
5204             } else {
5205                 memcpy(sc->u.data.buffer, res.data.buff, len);
5206             }
5207             sc->u.data.stat = res.stat;
5208             deallocate_GetDataResponse(&res);
5209         }
5210         break;
5211     case COMPLETION_STAT:
5212         if (sc->rc==0) {
5213             struct SetDataResponse res;
5214             deserialize_SetDataResponse(ia, "reply", &res);
5215             sc->u.stat = res.stat;
5216             deallocate_SetDataResponse(&res);
5217         }
5218         break;
5219     case COMPLETION_STRINGLIST:
5220         if (sc->rc==0) {
5221             struct GetChildrenResponse res;
5222             deserialize_GetChildrenResponse(ia, "reply", &res);
5223             sc->u.strs2 = res.children;
5224             /* We don't deallocate since we are passing it back */
5225             // deallocate_GetChildrenResponse(&res);
5226         }
5227         break;
5228     case COMPLETION_STRINGLIST_STAT:
5229         if (sc->rc==0) {
5230             struct GetChildren2Response res;
5231             deserialize_GetChildren2Response(ia, "reply", &res);
5232             sc->u.strs_stat.strs2 = res.children;
5233             sc->u.strs_stat.stat2 = res.stat;
5234             /* We don't deallocate since we are passing it back */
5235             // deallocate_GetChildren2Response(&res);
5236         }
5237         break;
5238     case COMPLETION_STRING:
5239         if (sc->rc==0) {
5240             struct CreateResponse res;
5241             int len;
5242             const char * client_path;
5243             deserialize_CreateResponse(ia, "reply", &res);
5244             //ZOOKEEPER-1027
5245             client_path = sub_string(zh, res.path);
5246             len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
5247                 len = sc->u.str.str_len;
5248             }
5249             if (len > 0) {
5250                 memcpy(sc->u.str.str, client_path, len - 1);
5251                 sc->u.str.str[len - 1] = '\0';
5252             }
5253             free_duplicate_path(client_path, res.path);
5254             deallocate_CreateResponse(&res);
5255         }
5256         break;
5257     case COMPLETION_STRING_STAT:
5258         if (sc->rc==0) {
5259             struct Create2Response res;
5260             int len;
5261             const char * client_path;
5262             deserialize_Create2Response(ia, "reply", &res);
5263             client_path = sub_string(zh, res.path);
5264             len = strlen(client_path) + 1;
5265             if (len > sc->u.str.str_len) {
5266                 len = sc->u.str.str_len;
5267             }
5268             if (len > 0) {
5269                 memcpy(sc->u.str.str, client_path, len - 1);
5270                 sc->u.str.str[len - 1] = '\0';
5271             }
5272             free_duplicate_path(client_path, res.path);
5273             sc->u.stat = res.stat;
5274             deallocate_Create2Response(&res);
5275         }
5276         break;
5277     case COMPLETION_ACLLIST:
5278         if (sc->rc==0) {
5279             struct GetACLResponse res;
5280             deserialize_GetACLResponse(ia, "reply", &res);
5281             sc->u.acl.acl = res.acl;
5282             sc->u.acl.stat = res.stat;
5283             /* We don't deallocate since we are passing it back */
5284             //deallocate_GetACLResponse(&res);
5285         }
5286         break;
5287     case COMPLETION_VOID:
5288         break;
5289     case COMPLETION_MULTI:
5290         sc->rc = deserialize_multi(zh, cptr->xid, cptr, ia);
5291         break;
5292     default:
5293         LOG_DEBUG(LOGCALLBACK(zh), "Unsupported completion type=%d", cptr->c.type);
5294         break;
5295     }
5296 }
5297 
5298 /*---------------------------------------------------------------------------*
5299  * SYNC API
5300  *---------------------------------------------------------------------------*/
5301 int zoo_create(zhandle_t *zh, const char *path, const char *value,
5302         int valuelen, const struct ACL_vector *acl, int mode,
5303         char *path_buffer, int path_buffer_len)
5304 {
5305     return zoo_create_ttl(zh, path, value, valuelen, acl, mode, -1,
5306         path_buffer, path_buffer_len);
5307 }
5308 
5309 int zoo_create_ttl(zhandle_t *zh, const char *path, const char *value,
5310         int valuelen, const struct ACL_vector *acl, int mode, int64_t ttl,
5311         char *path_buffer, int path_buffer_len)
5312 {
5313     struct sync_completion *sc = alloc_sync_completion();
5314     int rc;
5315     if (!sc) {
5316         return ZSYSTEMERROR;
5317     }
5318     sc->u.str.str = path_buffer;
5319     sc->u.str.str_len = path_buffer_len;
5320     rc=zoo_acreate_ttl(zh, path, value, valuelen, acl, mode, ttl, SYNCHRONOUS_MARKER, sc);
5321     if(rc==ZOK){
5322         wait_sync_completion(sc);
5323         rc = sc->rc;
5324     }
5325     free_sync_completion(sc);
5326     return rc;
5327 }
5328 
5329 int zoo_create2(zhandle_t *zh, const char *path, const char *value,
5330         int valuelen, const struct ACL_vector *acl, int mode,
5331         char *path_buffer, int path_buffer_len, struct Stat *stat)
5332 {
5333     return zoo_create2_ttl(zh, path, value, valuelen, acl, mode, -1,
5334         path_buffer, path_buffer_len, stat);
5335 }
5336 
5337 int zoo_create2_ttl(zhandle_t *zh, const char *path, const char *value,
5338         int valuelen, const struct ACL_vector *acl, int mode, int64_t ttl,
5339         char *path_buffer, int path_buffer_len, struct Stat *stat)
5340 {
5341     struct sync_completion *sc = alloc_sync_completion();
5342     int rc;
5343     if (!sc) {
5344         return ZSYSTEMERROR;
5345     }
5346 
5347     sc->u.str.str = path_buffer;
5348     sc->u.str.str_len = path_buffer_len;
5349     rc=zoo_acreate2_ttl(zh, path, value, valuelen, acl, mode, ttl, SYNCHRONOUS_MARKER, sc);
5350     if(rc==ZOK){
5351         wait_sync_completion(sc);
5352         rc = sc->rc;
5353         if (rc == 0 && stat) {
5354             *stat = sc->u.stat;
5355         }
5356     }
5357     free_sync_completion(sc);
5358     return rc;
5359 }
5360 
5361 int zoo_delete(zhandle_t *zh, const char *path, int version)
5362 {
5363     struct sync_completion *sc = alloc_sync_completion();
5364     int rc;
5365     if (!sc) {
5366         return ZSYSTEMERROR;
5367     }
5368     rc=zoo_adelete(zh, path, version, SYNCHRONOUS_MARKER, sc);
5369     if(rc==ZOK){
5370         wait_sync_completion(sc);
5371         rc = sc->rc;
5372     }
5373     free_sync_completion(sc);
5374     return rc;
5375 }
5376 
5377 int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
5378 {
5379     return zoo_wexists(zh,path,watch?zh->watcher:0,zh->context,stat);
5380 }
5381 
5382 int zoo_wexists(zhandle_t *zh, const char *path,
5383         watcher_fn watcher, void* watcherCtx, struct Stat *stat)
5384 {
5385     struct sync_completion *sc = alloc_sync_completion();
5386     int rc;
5387     if (!sc) {
5388         return ZSYSTEMERROR;
5389     }
5390     rc=zoo_awexists(zh,path,watcher,watcherCtx,SYNCHRONOUS_MARKER, sc);
5391     if(rc==ZOK){
5392         wait_sync_completion(sc);
5393         rc = sc->rc;
5394         if (rc == 0&& stat) {
5395             *stat = sc->u.stat;
5396         }
5397     }
5398     free_sync_completion(sc);
5399     return rc;
5400 }
5401 
5402 int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
5403         int* buffer_len, struct Stat *stat)
5404 {
5405     return zoo_wget(zh,path,watch?zh->watcher:0,zh->context,
5406             buffer,buffer_len,stat);
5407 }
5408 
5409 int zoo_wget(zhandle_t *zh, const char *path,
5410         watcher_fn watcher, void* watcherCtx,
5411         char *buffer, int* buffer_len, struct Stat *stat)
5412 {
5413     struct sync_completion *sc;
5414     int rc=0;
5415 
5416     if(buffer_len==NULL)
5417         return ZBADARGUMENTS;
5418     if((sc=alloc_sync_completion())==NULL)
5419         return ZSYSTEMERROR;
5420 
5421     sc->u.data.buffer = buffer;
5422     sc->u.data.buff_len = *buffer_len;
5423     rc=zoo_awget(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
5424     if(rc==ZOK){
5425         wait_sync_completion(sc);
5426         rc = sc->rc;
5427         if (rc == 0) {
5428             if(stat)
5429                 *stat = sc->u.data.stat;
5430             *buffer_len = sc->u.data.buff_len;
5431         }
5432     }
5433     free_sync_completion(sc);
5434     return rc;
5435 }
5436 
5437 int zoo_getconfig(zhandle_t *zh, int watch, char *buffer,
5438         int* buffer_len, struct Stat *stat)
5439 {
5440     return zoo_wget(zh,ZOO_CONFIG_NODE,watch?zh->watcher:0,zh->context, buffer,buffer_len,stat);
5441 }
5442 
5443 int zoo_wgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
5444        char *buffer, int* buffer_len, struct Stat *stat)
5445 {
5446    return zoo_wget(zh, ZOO_CONFIG_NODE, watcher, watcherCtx, buffer, buffer_len, stat);
5447 }
5448 
5449 
5450 int zoo_reconfig(zhandle_t *zh, const char *joining, const char *leaving,
5451        const char *members, int64_t version, char *buffer, int* buffer_len,
5452        struct Stat *stat)
5453 {
5454     struct sync_completion *sc;
5455     int rc=0;
5456 
5457     if(buffer_len==NULL)
5458         return ZBADARGUMENTS;
5459     if((sc=alloc_sync_completion())==NULL)
5460         return ZSYSTEMERROR;
5461 
5462     sc->u.data.buffer = buffer;
5463     sc->u.data.buff_len = *buffer_len;
5464     rc=zoo_areconfig(zh, joining, leaving, members, version, SYNCHRONOUS_MARKER, sc);
5465 
5466     if(rc==ZOK){
5467         wait_sync_completion(sc);
5468         rc = sc->rc;
5469         if (rc == 0) {
5470             if(stat)
5471                 *stat = sc->u.data.stat;
5472             *buffer_len = sc->u.data.buff_len;
5473         }
5474     }
5475     free_sync_completion(sc);
5476     return rc;
5477 }
5478 
5479 int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
5480         int version)
5481 {
5482   return zoo_set2(zh, path, buffer, buflen, version, 0);
5483 }
5484 
5485 int zoo_set2(zhandle_t *zh, const char *path, const char *buffer, int buflen,
5486         int version, struct Stat *stat)
5487 {
5488     struct sync_completion *sc = alloc_sync_completion();
5489     int rc;
5490     if (!sc) {
5491         return ZSYSTEMERROR;
5492     }
5493     rc=zoo_aset(zh, path, buffer, buflen, version, SYNCHRONOUS_MARKER, sc);
5494     if(rc==ZOK){
5495         wait_sync_completion(sc);
5496         rc = sc->rc;
5497         if (rc == 0 && stat) {
5498             *stat = sc->u.stat;
5499         }
5500     }
5501     free_sync_completion(sc);
5502     return rc;
5503 }
5504 
5505 static int zoo_wget_children_(zhandle_t *zh, const char *path,
5506         watcher_fn watcher, void* watcherCtx,
5507         struct String_vector *strings)
5508 {
5509     struct sync_completion *sc = alloc_sync_completion();
5510     int rc;
5511     if (!sc) {
5512         return ZSYSTEMERROR;
5513     }
5514     rc= zoo_awget_children (zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
5515     if(rc==ZOK){
5516         wait_sync_completion(sc);
5517         rc = sc->rc;
5518         if (rc == 0) {
5519             if (strings) {
5520                 *strings = sc->u.strs2;
5521             } else {
5522                 deallocate_String_vector(&sc->u.strs2);
5523             }
5524         }
5525     }
5526     free_sync_completion(sc);
5527     return rc;
5528 }
5529 
5530 static int zoo_wget_children2_(zhandle_t *zh, const char *path,
5531         watcher_fn watcher, void* watcherCtx,
5532         struct String_vector *strings, struct Stat *stat)
5533 {
5534     struct sync_completion *sc = alloc_sync_completion();
5535     int rc;
5536     if (!sc) {
5537         return ZSYSTEMERROR;
5538     }
5539     rc= zoo_awget_children2(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
5540 
5541     if(rc==ZOK){
5542         wait_sync_completion(sc);
5543         rc = sc->rc;
5544         if (rc == 0) {
5545             *stat = sc->u.strs_stat.stat2;
5546             if (strings) {
5547                 *strings = sc->u.strs_stat.strs2;
5548             } else {
5549                 deallocate_String_vector(&sc->u.strs_stat.strs2);
5550             }
5551         }
5552     }
5553     free_sync_completion(sc);
5554     return rc;
5555 }
5556 
5557 int zoo_get_children(zhandle_t *zh, const char *path, int watch,
5558         struct String_vector *strings)
5559 {
5560     return zoo_wget_children_(zh,path,watch?zh->watcher:0,zh->context,strings);
5561 }
5562 
5563 int zoo_wget_children(zhandle_t *zh, const char *path,
5564         watcher_fn watcher, void* watcherCtx,
5565         struct String_vector *strings)
5566 {
5567     return zoo_wget_children_(zh,path,watcher,watcherCtx,strings);
5568 }
5569 
5570 int zoo_get_children2(zhandle_t *zh, const char *path, int watch,
5571         struct String_vector *strings, struct Stat *stat)
5572 {
5573     return zoo_wget_children2_(zh,path,watch?zh->watcher:0,zh->context,strings,stat);
5574 }
5575 
5576 int zoo_wget_children2(zhandle_t *zh, const char *path,
5577         watcher_fn watcher, void* watcherCtx,
5578         struct String_vector *strings, struct Stat *stat)
5579 {
5580     return zoo_wget_children2_(zh,path,watcher,watcherCtx,strings,stat);
5581 }
5582 
5583 int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl,
5584         struct Stat *stat)
5585 {
5586     struct sync_completion *sc = alloc_sync_completion();
5587     int rc;
5588     if (!sc) {
5589         return ZSYSTEMERROR;
5590     }
5591     rc=zoo_aget_acl(zh, path, SYNCHRONOUS_MARKER, sc);
5592     if(rc==ZOK){
5593         wait_sync_completion(sc);
5594         rc = sc->rc;
5595         if (rc == 0&& stat) {
5596             *stat = sc->u.acl.stat;
5597         }
5598         if (rc == 0) {
5599             if (acl) {
5600                 *acl = sc->u.acl.acl;
5601             } else {
5602                 deallocate_ACL_vector(&sc->u.acl.acl);
5603             }
5604         }
5605     }
5606     free_sync_completion(sc);
5607     return rc;
5608 }
5609 
5610 int zoo_set_acl(zhandle_t *zh, const char *path, int version,
5611         const struct ACL_vector *acl)
5612 {
5613     struct sync_completion *sc = alloc_sync_completion();
5614     int rc;
5615     if (!sc) {
5616         return ZSYSTEMERROR;
5617     }
5618     rc=zoo_aset_acl(zh, path, version, (struct ACL_vector*)acl,
5619             SYNCHRONOUS_MARKER, sc);
5620     if(rc==ZOK){
5621         wait_sync_completion(sc);
5622         rc = sc->rc;
5623     }
5624     free_sync_completion(sc);
5625     return rc;
5626 }
5627 
5628 static int remove_watches(
5629     zhandle_t *zh, const char *path, ZooWatcherType wtype,
5630     watcher_fn watcher, void *wctx, int local, int all)
5631 {
5632     int rc = 0;
5633     struct sync_completion *sc;
5634 
5635     if (!path)
5636         return ZBADARGUMENTS;
5637 
5638     sc = alloc_sync_completion();
5639     if (!sc)
5640         return ZSYSTEMERROR;
5641 
5642     rc = aremove_watches(zh, path, wtype, watcher, wctx, local,
5643                               SYNCHRONOUS_MARKER, sc, all);
5644     if (rc == ZOK) {
5645         wait_sync_completion(sc);
5646         rc = sc->rc;
5647     }
5648     free_sync_completion(sc);
5649     return rc;
5650 }
5651 
5652 int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
5653 {
5654     int rc;
5655 
5656     struct sync_completion *sc = alloc_sync_completion();
5657     if (!sc) {
5658         return ZSYSTEMERROR;
5659     }
5660 
5661     rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
5662     if (rc == ZOK) {
5663         wait_sync_completion(sc);
5664         rc = sc->rc;
5665     }
5666     free_sync_completion(sc);
5667 
5668     return rc;
5669 }
5670 
5671 int zoo_remove_watches(zhandle_t *zh, const char *path, ZooWatcherType wtype,
5672          watcher_fn watcher, void *watcherCtx, int local)
5673 {
5674     return remove_watches(zh, path, wtype, watcher, watcherCtx, local, 0);
5675 }
5676 
5677 int zoo_remove_all_watches(
5678         zhandle_t *zh, const char *path, ZooWatcherType wtype, int local)
5679 {
5680     return remove_watches(zh, path, wtype, NULL, NULL, local, 1);
5681 
5682 }
5683 #endif
5684 
5685 int zoo_aremove_watches(zhandle_t *zh, const char *path, ZooWatcherType wtype,
5686         watcher_fn watcher, void *watcherCtx, int local,
5687         void_completion_t *completion, const void *data)
5688 {
5689     return aremove_watches(
5690         zh, path, wtype, watcher, watcherCtx, local, completion, data, 0);
5691 }
5692 
5693 int zoo_aremove_all_watches(zhandle_t *zh, const char *path,
5694         ZooWatcherType wtype, int local, void_completion_t *completion,
5695         const void *data)
5696 {
5697     return aremove_watches(
5698         zh, path, wtype, NULL, NULL, local, completion, data, 1);
5699 }
5700