1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifdef HAVE_CONFIG_H
20 #include "config.h"
21 #endif
22 
23 #ifdef DLL_EXPORT
24 #define USE_STATIC_LIB
25 #endif
26 
27 #if defined(__CYGWIN__)
28 #define USE_IPV6
29 #endif
30 
31 #include <stdlib.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <zookeeper_log.h>
35 #include <time.h>
36 #include <sys/time.h>
37 #include <sys/socket.h>
38 #include <limits.h>
39 #include <zoo_lock.h>
40 #include <stdbool.h>
41 #ifdef HAVE_SYS_UTSNAME_H
42 #include <sys/utsname.h>
43 #endif
44 
45 #ifdef HAVE_GETPWUID_R
46 #include <pwd.h>
47 #endif
48 
49 #define IF_DEBUG(x) if (logLevel==ZOO_LOG_LEVEL_DEBUG) {x;}
50 
51 
52 ZOOAPI int zkr_lock_init(zkr_lock_mutex_t* mutex, zhandle_t* zh,
53                       char* path, struct ACL_vector *acl) {
54     mutex->zh = zh;
55     mutex->path = path;
56     mutex->acl = acl;
57     mutex->completion = NULL;
58     mutex->cbdata = NULL;
59     mutex->id = NULL;
60     mutex->ownerid = NULL;
61     mutex->isOwner = 0;
62     pthread_mutex_init(&(mutex->pmutex), NULL);
63     return 0;
64 }
65 
66 ZOOAPI int zkr_lock_init_cb(zkr_lock_mutex_t *mutex, zhandle_t* zh,
67                          char *path, struct ACL_vector *acl,
68                          zkr_lock_completion completion, void* cbdata) {
69     mutex->zh = zh;
70     mutex->path = path;
71     mutex->acl = acl;
72     mutex->completion = completion;
73     mutex->cbdata = cbdata;
74     mutex->isOwner = 0;
75     mutex->ownerid = NULL;
76     mutex->id = NULL;
77     pthread_mutex_init(&(mutex->pmutex), NULL);
78     return 0;
79 }
80 
81 /**
82  * unlock the mutex
83  */
84 ZOOAPI int zkr_lock_unlock(zkr_lock_mutex_t *mutex) {
85     pthread_mutex_lock(&(mutex->pmutex));
86     zhandle_t *zh = mutex->zh;
87     if (mutex->id != NULL) {
88         int len = strlen(mutex->path) + strlen(mutex->id) + 2;
89         char buf[len];
90         sprintf(buf, "%s/%s", mutex->path, mutex->id);
91         int ret = 0;
92         int count = 0;
93         struct timespec ts;
94         ts.tv_sec = 0;
95         ts.tv_nsec = (.5)*1000000;
96         ret = ZCONNECTIONLOSS;
97         while (ret == ZCONNECTIONLOSS && (count < 3)) {
98             ret = zoo_delete(zh, buf, -1);
99             if (ret == ZCONNECTIONLOSS) {
100 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5)
101                 LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while deleting the node"));
102 #else
103                 LOG_DEBUG(("connectionloss while deleting the node"));
104 #endif
105                 nanosleep(&ts, 0);
106                 count++;
107             }
108         }
109         if (ret == ZOK || ret == ZNONODE) {
110             zkr_lock_completion completion = mutex->completion;
111             if (completion != NULL) {
112                 completion(1, mutex->cbdata);
113             }
114 
115             free(mutex->id);
116             mutex->id = NULL;
117             pthread_mutex_unlock(&(mutex->pmutex));
118             return 0;
119         }
120 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5)
121         LOG_WARN(LOGCALLBACK(zh), ("not able to connect to server - giving up"));
122 #else
123         LOG_WARN(("not able to connect to server - giving up"));
124 #endif
125         pthread_mutex_unlock(&(mutex->pmutex));
126         return ZCONNECTIONLOSS;
127     }
128     pthread_mutex_unlock(&(mutex->pmutex));
129     return ZSYSTEMERROR;
130 }
131 
132 static void free_String_vector(struct String_vector *v) {
133     if (v->data) {
134         int32_t i;
135         for (i=0; i<v->count; i++) {
136             free(v->data[i]);
137         }
138         free(v->data);
139         v->data = 0;
140     }
141 }
142 
143 static int vstrcmp(const void* str1, const void* str2) {
144     const char **a = (const char**)str1;
145     const char **b = (const char**) str2;
146     return strcmp(strrchr(*a, '-')+1, strrchr(*b, '-')+1);
147 }
148 
149 static void sort_children(struct String_vector *vector) {
150     qsort( vector->data, vector->count, sizeof(char*), &vstrcmp);
151 }
152 
153 static char* child_floor(char **sorted_data, int len, char *element) {
154     char* ret = NULL;
155     int i =0;
156     for (i=0; i < len; i++) {
157         if (strcmp(sorted_data[i], element) < 0) {
158             ret = sorted_data[i];
159         }
160     }
161     return ret;
162 }
163 
164 static void lock_watcher_fn(zhandle_t* zh, int type, int state,
165                             const char* path, void *watcherCtx) {
166     //callback that we registered
167     //should be called
168     zkr_lock_lock((zkr_lock_mutex_t*) watcherCtx);
169 }
170 
171 /**
172  * get the last name of the path
173  */
174 static char* getName(char* str) {
175     char* name = strrchr(str, '/');
176     if (name == NULL)
177         return NULL;
178     return strdup(name + 1);
179 }
180 
181 /**
182  * just a method to retry get children
183  */
184 static int retry_getchildren(zhandle_t *zh, char* path, struct String_vector *vector,
185                              struct timespec *ts, int retry) {
186     int ret = ZCONNECTIONLOSS;
187     int count = 0;
188     while (ret == ZCONNECTIONLOSS && count < retry) {
189         ret = zoo_get_children(zh, path, 0, vector);
190         if (ret == ZCONNECTIONLOSS) {
191 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5)
192             LOG_DEBUG(LOGCALLBACK(zh), ("connection loss to the server"));
193 #else
194             LOG_DEBUG(("connection loss to the server"));
195 #endif
196             nanosleep(ts, 0);
197             count++;
198         }
199     }
200     return ret;
201 }
202 
203 /** see if our node already exists
204  * if it does then we dup the name and
205  * return it
206  */
207 static char* lookupnode(struct String_vector *vector, char *prefix) {
208     char *ret = NULL;
209     if (vector->data) {
210         int i = 0;
211         for (i = 0; i < vector->count; i++) {
212             char* child = vector->data[i];
213             if (strncmp(prefix, child, strlen(prefix)) == 0) {
214                 ret = strdup(child);
215                 break;
216             }
217         }
218     }
219     return ret;
220 }
221 
222 /** retry zoo_wexists
223  */
224 static int retry_zoowexists(zhandle_t *zh, char* path, watcher_fn watcher, void* ctx,
225                             struct Stat *stat, struct timespec *ts, int retry) {
226     int ret = ZCONNECTIONLOSS;
227     int count = 0;
228     while (ret == ZCONNECTIONLOSS && count < retry) {
229         ret = zoo_wexists(zh, path, watcher, ctx, stat);
230         if (ret == ZCONNECTIONLOSS) {
231 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5)
232             LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while setting watch on my predecessor"));
233 #else
234             LOG_DEBUG(("connectionloss while setting watch on my predecessor"));
235 #endif
236             nanosleep(ts, 0);
237             count++;
238         }
239     }
240     return ret;
241 }
242 
243 /**
244  * the main code that does the zookeeper leader
245  * election. this code creates its own ephemeral
246  * node on the given path and sees if its the first
247  * one on the list and claims to be a leader if and only
248  * if its the first one of children in the paretn path
249  */
250 static int zkr_lock_operation(zkr_lock_mutex_t *mutex, struct timespec *ts) {
251     zhandle_t *zh = mutex->zh;
252     char *path = mutex->path;
253     char *id = mutex->id;
254     struct Stat stat;
255     char* owner_id = NULL;
256     int retry = 3;
257     do {
258         const clientid_t *cid = zoo_client_id(zh);
259         // get the session id
260         int64_t session = cid->client_id;
261         char prefix[30];
262         int ret = 0;
263 #if defined(__x86_64__)
264         snprintf(prefix, 30, "x-%016lx-", session);
265 #else
266         snprintf(prefix, 30, "x-%016llx-", session);
267 #endif
268         struct String_vector vectorst;
269         vectorst.data = NULL;
270         vectorst.count = 0;
271         ret = ZCONNECTIONLOSS;
272         ret = retry_getchildren(zh, path, &vectorst, ts, retry);
273         if (ret != ZOK)
274             return ret;
275         struct String_vector *vector = &vectorst;
276         mutex->id = lookupnode(vector, prefix);
277         free_String_vector(vector);
278         if (mutex->id == NULL) {
279             int len = strlen(path) + strlen(prefix) + 2;
280             char buf[len];
281             char retbuf[len+20];
282             snprintf(buf, len, "%s/%s", path, prefix);
283             ret = ZCONNECTIONLOSS;
284             ret = zoo_create(zh, buf, NULL, 0,  mutex->acl,
285                              ZOO_EPHEMERAL|ZOO_SEQUENCE, retbuf, (len+20));
286 
287             // do not want to retry the create since
288             // we would end up creating more than one child
289             if (ret != ZOK) {
290 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5)
291                 LOG_WARN(LOGCALLBACK(zh), ("could not create zoo node %s", buf));
292 #else
293                 LOG_WARN(("could not create zoo node %s", buf));
294 #endif
295                 return ret;
296             }
297             mutex->id = getName(retbuf);
298         }
299 
300         if (mutex->id != NULL) {
301             ret = ZCONNECTIONLOSS;
302             ret = retry_getchildren(zh, path, vector, ts, retry);
303             if (ret != ZOK) {
304 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5)
305                 LOG_WARN(LOGCALLBACK(zh), ("could not connect to server"));
306 #else
307                 LOG_WARN(("could not connect to server"));
308 #endif
309                 return ret;
310             }
311             //sort this list
312             sort_children(vector);
313             owner_id = vector->data[0];
314             mutex->ownerid = strdup(owner_id);
315             id = mutex->id;
316             char* lessthanme = child_floor(vector->data, vector->count, id);
317             if (lessthanme != NULL) {
318                 int flen = strlen(mutex->path) + strlen(lessthanme) + 2;
319                 char last_child[flen];
320                 sprintf(last_child, "%s/%s",mutex->path, lessthanme);
321                 ret = ZCONNECTIONLOSS;
322                 ret = retry_zoowexists(zh, last_child, &lock_watcher_fn, mutex,
323                                        &stat, ts, retry);
324                 // cannot watch my predecessor i am giving up
325                 // we need to be able to watch the predecessor
326                 // since if we do not become a leader the others
327                 // will keep waiting
328                 if (ret != ZOK) {
329                     free_String_vector(vector);
330 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5)
331                     LOG_WARN(LOGCALLBACK(zh), ("unable to watch my predecessor"));
332 #else
333                     LOG_WARN(("unable to watch my predecessor"));
334 #endif
335                     ret = zkr_lock_unlock(mutex);
336                     while (ret == 0) {
337                         //we have to give up our leadership
338                         // since we cannot watch out predecessor
339                         ret = zkr_lock_unlock(mutex);
340                     }
341                     return ret;
342                 }
343                 // we are not the owner of the lock
344                 mutex->isOwner = 0;
345             }
346             else {
347                 // this is the case when we are the owner
348                 // of the lock
349                 if (strcmp(mutex->id, owner_id) == 0) {
350 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5)
351                     LOG_DEBUG(LOGCALLBACK(zh), ("got the zoo lock owner - %s", mutex->id));
352 #else
353                     LOG_DEBUG(("got the zoo lock owner - %s", mutex->id));
354 #endif
355                     mutex->isOwner = 1;
356                     if (mutex->completion != NULL) {
357                         mutex->completion(0, mutex->cbdata);
358                     }
359                     return ZOK;
360                 }
361             }
362             free_String_vector(vector);
363             return ZOK;
364         }
365     } while (mutex->id == NULL);
366     return ZOK;
367 }
368 
369 ZOOAPI int zkr_lock_lock(zkr_lock_mutex_t *mutex) {
370     pthread_mutex_lock(&(mutex->pmutex));
371     zhandle_t *zh = mutex->zh;
372     char *path = mutex->path;
373     struct Stat stat;
374     int exists = zoo_exists(zh, path, 0, &stat);
375     int count = 0;
376     struct timespec ts;
377     ts.tv_sec = 0;
378     ts.tv_nsec = (.5)*1000000;
379     // retry to see if the path exists and
380     // and create if the path does not exist
381     while ((exists == ZCONNECTIONLOSS || exists == ZNONODE) && (count <4)) {
382         count++;
383         // retry the operation
384         if (exists == ZCONNECTIONLOSS)
385             exists = zoo_exists(zh, path, 0, &stat);
386         else if (exists == ZNONODE)
387             exists = zoo_create(zh, path, NULL, 0, mutex->acl, 0, NULL, 0);
388         nanosleep(&ts, 0);
389 
390     }
391 
392     // need to check if we cannot still access the server
393     int check_retry = ZCONNECTIONLOSS;
394     count = 0;
395     while (check_retry != ZOK && count <4) {
396         check_retry = zkr_lock_operation(mutex, &ts);
397         if (check_retry != ZOK) {
398             nanosleep(&ts, 0);
399             count++;
400         }
401     }
402     pthread_mutex_unlock(&(mutex->pmutex));
403     return zkr_lock_isowner(mutex);
404 }
405 
406 
407 ZOOAPI char* zkr_lock_getpath(zkr_lock_mutex_t *mutex) {
408     return mutex->path;
409 }
410 
411 ZOOAPI int zkr_lock_isowner(zkr_lock_mutex_t *mutex) {
412     return (mutex->id != NULL && mutex->ownerid != NULL
413             && (strcmp(mutex->id, mutex->ownerid) == 0));
414 }
415 
416 ZOOAPI char* zkr_lock_getid(zkr_lock_mutex_t *mutex) {
417     return mutex->ownerid;
418 }
419 
420 ZOOAPI int zkr_lock_destroy(zkr_lock_mutex_t* mutex) {
421     if (mutex->id)
422         free(mutex->id);
423     mutex->path = NULL;
424     mutex->acl = NULL;
425     mutex->completion = NULL;
426     pthread_mutex_destroy(&(mutex->pmutex));
427     mutex->isOwner = 0;
428     if (mutex->ownerid)
429         free(mutex->ownerid);
430     return 0;
431 }
432 
433