1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifdef DLL_EXPORT
20 #define USE_STATIC_LIB
21 #endif
22 
23 #if defined(__CYGWIN__)
24 #define USE_IPV6
25 #endif
26 
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <zookeeper_log.h>
31 #include <time.h>
32 #include <sys/time.h>
33 #include <sys/socket.h>
34 #include <limits.h>
35 #include <zoo_lock.h>
36 #include <stdbool.h>
37 #ifdef HAVE_SYS_UTSNAME_H
38 #include <sys/utsname.h>
39 #endif
40 
41 #ifdef HAVE_GETPWUID_R
42 #include <pwd.h>
43 #endif
44 
45 #define IF_DEBUG(x) if (logLevel==ZOO_LOG_LEVEL_DEBUG) {x;}
46 
47 
zkr_lock_init(zkr_lock_mutex_t * mutex,zhandle_t * zh,char * path,struct ACL_vector * acl)48 ZOOAPI int zkr_lock_init(zkr_lock_mutex_t* mutex, zhandle_t* zh,
49                       char* path, struct ACL_vector *acl) {
50     mutex->zh = zh;
51     mutex->path = path;
52     mutex->acl = acl;
53     mutex->completion = NULL;
54     mutex->cbdata = NULL;
55     mutex->id = NULL;
56     mutex->ownerid = NULL;
57     mutex->isOwner = 0;
58     pthread_mutex_init(&(mutex->pmutex), NULL);
59     return 0;
60 }
61 
zkr_lock_init_cb(zkr_lock_mutex_t * mutex,zhandle_t * zh,char * path,struct ACL_vector * acl,zkr_lock_completion completion,void * cbdata)62 ZOOAPI int zkr_lock_init_cb(zkr_lock_mutex_t *mutex, zhandle_t* zh,
63                          char *path, struct ACL_vector *acl,
64                          zkr_lock_completion completion, void* cbdata) {
65     mutex->zh = zh;
66     mutex->path = path;
67     mutex->acl = acl;
68     mutex->completion = completion;
69     mutex->cbdata = cbdata;
70     mutex->isOwner = 0;
71     mutex->ownerid = NULL;
72     mutex->id = NULL;
73     pthread_mutex_init(&(mutex->pmutex), NULL);
74     return 0;
75 }
76 
_zkr_lock_unlock_nolock(zkr_lock_mutex_t * mutex)77 static int _zkr_lock_unlock_nolock(zkr_lock_mutex_t *mutex) {
78     zhandle_t *zh = mutex->zh;
79     if (mutex->id != NULL) {
80         int len = strlen(mutex->path) + strlen(mutex->id) + 2;
81         char buf[len];
82         sprintf(buf, "%s/%s", mutex->path, mutex->id);
83         int ret = 0;
84         int count = 0;
85         struct timespec ts;
86         ts.tv_sec = 0;
87         ts.tv_nsec = (.5)*1000000;
88         ret = ZCONNECTIONLOSS;
89         while (ret == ZCONNECTIONLOSS && (count < 3)) {
90             ret = zoo_delete(zh, buf, -1);
91             if (ret == ZCONNECTIONLOSS) {
92                 LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while deleting the node"));
93                 nanosleep(&ts, 0);
94                 count++;
95             }
96         }
97         if (ret == ZOK || ret == ZNONODE) {
98             zkr_lock_completion completion = mutex->completion;
99             if (completion != NULL) {
100                 completion(1, mutex->cbdata);
101             }
102 
103             free(mutex->id);
104             mutex->id = NULL;
105             return 0;
106         }
107         LOG_WARN(LOGCALLBACK(zh), ("not able to connect to server - giving up"));
108         return ZCONNECTIONLOSS;
109     }
110 
111 	return ZSYSTEMERROR;
112 }
113 /**
114  * unlock the mutex
115  */
zkr_lock_unlock(zkr_lock_mutex_t * mutex)116 ZOOAPI int zkr_lock_unlock(zkr_lock_mutex_t *mutex) {
117 	int ret = 0;
118     pthread_mutex_lock(&(mutex->pmutex));
119     ret = _zkr_lock_unlock_nolock(mutex);
120     pthread_mutex_unlock(&(mutex->pmutex));
121     return ret;
122 }
123 
free_String_vector(struct String_vector * v)124 static void free_String_vector(struct String_vector *v) {
125     if (v->data) {
126         int32_t i;
127         for (i=0; i<v->count; i++) {
128             free(v->data[i]);
129         }
130         free(v->data);
131         v->data = 0;
132     }
133 }
134 
strcmp_suffix(const char * str1,const char * str2)135 static int strcmp_suffix(const char *str1, const char *str2) {
136     return strcmp(strrchr(str1, '-')+1, strrchr(str2, '-')+1);
137 }
138 
vstrcmp(const void * str1,const void * str2)139 static int vstrcmp(const void* str1, const void* str2) {
140     const char **a = (const char**)str1;
141     const char **b = (const char**) str2;
142     return strcmp_suffix(*a, *b);
143 }
144 
sort_children(struct String_vector * vector)145 static void sort_children(struct String_vector *vector) {
146     qsort( vector->data, vector->count, sizeof(char*), &vstrcmp);
147 }
148 
child_floor(char ** sorted_data,int len,char * element)149 static char* child_floor(char **sorted_data, int len, char *element) {
150     char* ret = NULL;
151     int targetpos = -1, s = 0, e = len -1;
152 
153     while ( targetpos < 0 && s <= e ) {
154 		int const i = s + (e - s) / 2;
155 		int const cmp = strcmp_suffix(sorted_data[i], element);
156 		if (cmp < 0) {
157 			s = i + 1;
158 		} else if (cmp == 0) {
159 			targetpos = i;
160 		} else {
161 			e = i - 1;
162 		}
163 	}
164 
165 	if (targetpos > 0) {
166 		ret = sorted_data[targetpos - 1];
167 	}
168 
169     return ret;
170 }
171 
lock_watcher_fn(zhandle_t * zh,int type,int state,const char * path,void * watcherCtx)172 static void lock_watcher_fn(zhandle_t* zh, int type, int state,
173                             const char* path, void *watcherCtx) {
174     //callback that we registered
175     //should be called
176     zkr_lock_lock((zkr_lock_mutex_t*) watcherCtx);
177 }
178 
179 /**
180  * get the last name of the path
181  */
getName(char * str)182 static char* getName(char* str) {
183     char* name = strrchr(str, '/');
184     if (name == NULL)
185         return NULL;
186     return strdup(name + 1);
187 }
188 
189 /**
190  * just a method to retry get children
191  */
retry_getchildren(zhandle_t * zh,char * path,struct String_vector * vector,struct timespec * ts,int retry)192 static int retry_getchildren(zhandle_t *zh, char* path, struct String_vector *vector,
193                              struct timespec *ts, int retry) {
194     int ret = ZCONNECTIONLOSS;
195     int count = 0;
196     while (ret == ZCONNECTIONLOSS && count < retry) {
197         ret = zoo_get_children(zh, path, 0, vector);
198         if (ret == ZCONNECTIONLOSS) {
199             LOG_DEBUG(LOGCALLBACK(zh), ("connection loss to the server"));
200             nanosleep(ts, 0);
201             count++;
202         }
203     }
204     return ret;
205 }
206 
207 /** see if our node already exists
208  * if it does then we dup the name and
209  * return it
210  */
lookupnode(struct String_vector * vector,char * prefix)211 static char* lookupnode(struct String_vector *vector, char *prefix) {
212     char *ret = NULL;
213     if (vector->data) {
214         int i = 0;
215         for (i = 0; i < vector->count; i++) {
216             char* child = vector->data[i];
217             if (strncmp(prefix, child, strlen(prefix)) == 0) {
218                 ret = strdup(child);
219                 break;
220             }
221         }
222     }
223     return ret;
224 }
225 
226 /** retry zoo_wexists
227  */
retry_zoowexists(zhandle_t * zh,char * path,watcher_fn watcher,void * ctx,struct Stat * stat,struct timespec * ts,int retry)228 static int retry_zoowexists(zhandle_t *zh, char* path, watcher_fn watcher, void* ctx,
229                             struct Stat *stat, struct timespec *ts, int retry) {
230     int ret = ZCONNECTIONLOSS;
231     int count = 0;
232     while (ret == ZCONNECTIONLOSS && count < retry) {
233         ret = zoo_wexists(zh, path, watcher, ctx, stat);
234         if (ret == ZCONNECTIONLOSS) {
235             LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while setting watch on my predecessor"));
236             nanosleep(ts, 0);
237             count++;
238         }
239     }
240     return ret;
241 }
242 
243 /**
244  * the main code that does the zookeeper leader
245  * election. this code creates its own ephemeral
246  * node on the given path and sees if its the first
247  * one on the list and claims to be a leader if and only
248  * if its the first one of children in the paretn path
249  */
zkr_lock_operation(zkr_lock_mutex_t * mutex,struct timespec * ts)250 static int zkr_lock_operation(zkr_lock_mutex_t *mutex, struct timespec *ts) {
251     zhandle_t *zh = mutex->zh;
252     char *path = mutex->path;
253     char *id = mutex->id;
254     struct Stat stat;
255     char* owner_id = NULL;
256     int retry = 3;
257     do {
258         const clientid_t *cid = zoo_client_id(zh);
259         // get the session id
260         int64_t session = cid->client_id;
261         char prefix[30];
262         int ret = 0;
263 #if defined(__x86_64__)
264         snprintf(prefix, 30, "x-%016lx-", session);
265 #else
266         snprintf(prefix, 30, "x-%016llx-", session);
267 #endif
268         struct String_vector vectorst;
269         vectorst.data = NULL;
270         vectorst.count = 0;
271         ret = ZCONNECTIONLOSS;
272         ret = retry_getchildren(zh, path, &vectorst, ts, retry);
273         if (ret != ZOK)
274             return ret;
275         struct String_vector *vector = &vectorst;
276         mutex->id = lookupnode(vector, prefix);
277         free_String_vector(vector);
278         if (mutex->id == NULL) {
279             int len = strlen(path) + strlen(prefix) + 2;
280             char buf[len];
281             char retbuf[len+20];
282             snprintf(buf, len, "%s/%s", path, prefix);
283             ret = ZCONNECTIONLOSS;
284             ret = zoo_create(zh, buf, NULL, 0,  mutex->acl,
285                              ZOO_EPHEMERAL|ZOO_SEQUENCE, retbuf, (len+20));
286 
287             // do not want to retry the create since
288             // we would end up creating more than one child
289             if (ret != ZOK) {
290                 LOG_WARN(LOGCALLBACK(zh), "could not create zoo node %s", buf);
291                 return ret;
292             }
293             mutex->id = getName(retbuf);
294         }
295 
296         if (mutex->id != NULL) {
297             ret = ZCONNECTIONLOSS;
298             ret = retry_getchildren(zh, path, vector, ts, retry);
299             if (ret != ZOK) {
300                 LOG_WARN(LOGCALLBACK(zh), ("could not connect to server"));
301                 return ret;
302             }
303             //sort this list
304             sort_children(vector);
305             owner_id = vector->data[0];
306             mutex->ownerid = strdup(owner_id);
307             id = mutex->id;
308             char* lessthanme = child_floor(vector->data, vector->count, id);
309             if (lessthanme != NULL) {
310                 int flen = strlen(mutex->path) + strlen(lessthanme) + 2;
311                 char last_child[flen];
312                 sprintf(last_child, "%s/%s",mutex->path, lessthanme);
313                 ret = ZCONNECTIONLOSS;
314                 ret = retry_zoowexists(zh, last_child, &lock_watcher_fn, mutex,
315                                        &stat, ts, retry);
316                 // cannot watch my predecessor i am giving up
317                 // we need to be able to watch the predecessor
318                 // since if we do not become a leader the others
319                 // will keep waiting
320                 if (ret != ZOK) {
321                     free_String_vector(vector);
322                     LOG_WARN(LOGCALLBACK(zh), ("unable to watch my predecessor"));
323                     ret = _zkr_lock_unlock_nolock(mutex);
324                     while (ret == 0) {
325                         //we have to give up our leadership
326                         // since we cannot watch out predecessor
327                         ret = _zkr_lock_unlock_nolock(mutex);
328                     }
329                     return ret;
330                 }
331                 // we are not the owner of the lock
332                 mutex->isOwner = 0;
333             }
334             else {
335                 // this is the case when we are the owner
336                 // of the lock
337                 if (strcmp(mutex->id, owner_id) == 0) {
338                     LOG_DEBUG(LOGCALLBACK(zh), "got the zoo lock owner - %s", mutex->id);
339                     mutex->isOwner = 1;
340                     if (mutex->completion != NULL) {
341                         mutex->completion(0, mutex->cbdata);
342                     }
343                     return ZOK;
344                 }
345             }
346             free_String_vector(vector);
347             return ZOK;
348         }
349     } while (mutex->id == NULL);
350     return ZOK;
351 }
352 
zkr_lock_lock(zkr_lock_mutex_t * mutex)353 ZOOAPI int zkr_lock_lock(zkr_lock_mutex_t *mutex) {
354     pthread_mutex_lock(&(mutex->pmutex));
355     zhandle_t *zh = mutex->zh;
356     char *path = mutex->path;
357     struct Stat stat;
358     int exists = zoo_exists(zh, path, 0, &stat);
359     int count = 0;
360     struct timespec ts;
361     ts.tv_sec = 0;
362     ts.tv_nsec = (.5)*1000000;
363     // retry to see if the path exists and
364     // and create if the path does not exist
365     while ((exists == ZCONNECTIONLOSS || exists == ZNONODE) && (count <4)) {
366         count++;
367         // retry the operation
368         if (exists == ZCONNECTIONLOSS)
369             exists = zoo_exists(zh, path, 0, &stat);
370         else if (exists == ZNONODE)
371             exists = zoo_create(zh, path, NULL, 0, mutex->acl, 0, NULL, 0);
372         nanosleep(&ts, 0);
373 
374     }
375 
376     // need to check if we cannot still access the server
377     int check_retry = ZCONNECTIONLOSS;
378     count = 0;
379     while (check_retry != ZOK && count <4) {
380         check_retry = zkr_lock_operation(mutex, &ts);
381         if (check_retry != ZOK) {
382             nanosleep(&ts, 0);
383             count++;
384         }
385     }
386     pthread_mutex_unlock(&(mutex->pmutex));
387     return 0;
388 }
389 
390 
zkr_lock_getpath(zkr_lock_mutex_t * mutex)391 ZOOAPI char* zkr_lock_getpath(zkr_lock_mutex_t *mutex) {
392     return mutex->path;
393 }
394 
zkr_lock_isowner(zkr_lock_mutex_t * mutex)395 ZOOAPI int zkr_lock_isowner(zkr_lock_mutex_t *mutex) {
396     return (mutex->id != NULL && mutex->ownerid != NULL
397             && (strcmp(mutex->id, mutex->ownerid) == 0));
398 }
399 
zkr_lock_getid(zkr_lock_mutex_t * mutex)400 ZOOAPI char* zkr_lock_getid(zkr_lock_mutex_t *mutex) {
401     return mutex->ownerid;
402 }
403 
zkr_lock_destroy(zkr_lock_mutex_t * mutex)404 ZOOAPI int zkr_lock_destroy(zkr_lock_mutex_t* mutex) {
405     if (mutex->id)
406         free(mutex->id);
407     mutex->path = NULL;
408     mutex->acl = NULL;
409     mutex->completion = NULL;
410     pthread_mutex_destroy(&(mutex->pmutex));
411     mutex->isOwner = 0;
412     if (mutex->ownerid)
413         free(mutex->ownerid);
414     return 0;
415 }
416 
417