1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 #ifdef DLL_EXPORT
20 #define USE_STATIC_LIB
21 #endif
22
23 #if defined(__CYGWIN__)
24 #define USE_IPV6
25 #endif
26
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <zookeeper_log.h>
31 #include <time.h>
32 #include <sys/time.h>
33 #include <sys/socket.h>
34 #include <limits.h>
35 #include <zoo_lock.h>
36 #include <stdbool.h>
37 #ifdef HAVE_SYS_UTSNAME_H
38 #include <sys/utsname.h>
39 #endif
40
41 #ifdef HAVE_GETPWUID_R
42 #include <pwd.h>
43 #endif
44
45 #define IF_DEBUG(x) if (logLevel==ZOO_LOG_LEVEL_DEBUG) {x;}
46
47
zkr_lock_init(zkr_lock_mutex_t * mutex,zhandle_t * zh,char * path,struct ACL_vector * acl)48 ZOOAPI int zkr_lock_init(zkr_lock_mutex_t* mutex, zhandle_t* zh,
49 char* path, struct ACL_vector *acl) {
50 mutex->zh = zh;
51 mutex->path = path;
52 mutex->acl = acl;
53 mutex->completion = NULL;
54 mutex->cbdata = NULL;
55 mutex->id = NULL;
56 mutex->ownerid = NULL;
57 mutex->isOwner = 0;
58 pthread_mutex_init(&(mutex->pmutex), NULL);
59 return 0;
60 }
61
zkr_lock_init_cb(zkr_lock_mutex_t * mutex,zhandle_t * zh,char * path,struct ACL_vector * acl,zkr_lock_completion completion,void * cbdata)62 ZOOAPI int zkr_lock_init_cb(zkr_lock_mutex_t *mutex, zhandle_t* zh,
63 char *path, struct ACL_vector *acl,
64 zkr_lock_completion completion, void* cbdata) {
65 mutex->zh = zh;
66 mutex->path = path;
67 mutex->acl = acl;
68 mutex->completion = completion;
69 mutex->cbdata = cbdata;
70 mutex->isOwner = 0;
71 mutex->ownerid = NULL;
72 mutex->id = NULL;
73 pthread_mutex_init(&(mutex->pmutex), NULL);
74 return 0;
75 }
76
_zkr_lock_unlock_nolock(zkr_lock_mutex_t * mutex)77 static int _zkr_lock_unlock_nolock(zkr_lock_mutex_t *mutex) {
78 zhandle_t *zh = mutex->zh;
79 if (mutex->id != NULL) {
80 int len = strlen(mutex->path) + strlen(mutex->id) + 2;
81 char buf[len];
82 sprintf(buf, "%s/%s", mutex->path, mutex->id);
83 int ret = 0;
84 int count = 0;
85 struct timespec ts;
86 ts.tv_sec = 0;
87 ts.tv_nsec = (.5)*1000000;
88 ret = ZCONNECTIONLOSS;
89 while (ret == ZCONNECTIONLOSS && (count < 3)) {
90 ret = zoo_delete(zh, buf, -1);
91 if (ret == ZCONNECTIONLOSS) {
92 LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while deleting the node"));
93 nanosleep(&ts, 0);
94 count++;
95 }
96 }
97 if (ret == ZOK || ret == ZNONODE) {
98 zkr_lock_completion completion = mutex->completion;
99 if (completion != NULL) {
100 completion(1, mutex->cbdata);
101 }
102
103 free(mutex->id);
104 mutex->id = NULL;
105 return 0;
106 }
107 LOG_WARN(LOGCALLBACK(zh), ("not able to connect to server - giving up"));
108 return ZCONNECTIONLOSS;
109 }
110
111 return ZSYSTEMERROR;
112 }
113 /**
114 * unlock the mutex
115 */
zkr_lock_unlock(zkr_lock_mutex_t * mutex)116 ZOOAPI int zkr_lock_unlock(zkr_lock_mutex_t *mutex) {
117 int ret = 0;
118 pthread_mutex_lock(&(mutex->pmutex));
119 ret = _zkr_lock_unlock_nolock(mutex);
120 pthread_mutex_unlock(&(mutex->pmutex));
121 return ret;
122 }
123
free_String_vector(struct String_vector * v)124 static void free_String_vector(struct String_vector *v) {
125 if (v->data) {
126 int32_t i;
127 for (i=0; i<v->count; i++) {
128 free(v->data[i]);
129 }
130 free(v->data);
131 v->data = 0;
132 }
133 }
134
strcmp_suffix(const char * str1,const char * str2)135 static int strcmp_suffix(const char *str1, const char *str2) {
136 return strcmp(strrchr(str1, '-')+1, strrchr(str2, '-')+1);
137 }
138
vstrcmp(const void * str1,const void * str2)139 static int vstrcmp(const void* str1, const void* str2) {
140 const char **a = (const char**)str1;
141 const char **b = (const char**) str2;
142 return strcmp_suffix(*a, *b);
143 }
144
sort_children(struct String_vector * vector)145 static void sort_children(struct String_vector *vector) {
146 qsort( vector->data, vector->count, sizeof(char*), &vstrcmp);
147 }
148
child_floor(char ** sorted_data,int len,char * element)149 static char* child_floor(char **sorted_data, int len, char *element) {
150 char* ret = NULL;
151 int targetpos = -1, s = 0, e = len -1;
152
153 while ( targetpos < 0 && s <= e ) {
154 int const i = s + (e - s) / 2;
155 int const cmp = strcmp_suffix(sorted_data[i], element);
156 if (cmp < 0) {
157 s = i + 1;
158 } else if (cmp == 0) {
159 targetpos = i;
160 } else {
161 e = i - 1;
162 }
163 }
164
165 if (targetpos > 0) {
166 ret = sorted_data[targetpos - 1];
167 }
168
169 return ret;
170 }
171
lock_watcher_fn(zhandle_t * zh,int type,int state,const char * path,void * watcherCtx)172 static void lock_watcher_fn(zhandle_t* zh, int type, int state,
173 const char* path, void *watcherCtx) {
174 //callback that we registered
175 //should be called
176 zkr_lock_lock((zkr_lock_mutex_t*) watcherCtx);
177 }
178
179 /**
180 * get the last name of the path
181 */
getName(char * str)182 static char* getName(char* str) {
183 char* name = strrchr(str, '/');
184 if (name == NULL)
185 return NULL;
186 return strdup(name + 1);
187 }
188
189 /**
190 * just a method to retry get children
191 */
retry_getchildren(zhandle_t * zh,char * path,struct String_vector * vector,struct timespec * ts,int retry)192 static int retry_getchildren(zhandle_t *zh, char* path, struct String_vector *vector,
193 struct timespec *ts, int retry) {
194 int ret = ZCONNECTIONLOSS;
195 int count = 0;
196 while (ret == ZCONNECTIONLOSS && count < retry) {
197 ret = zoo_get_children(zh, path, 0, vector);
198 if (ret == ZCONNECTIONLOSS) {
199 LOG_DEBUG(LOGCALLBACK(zh), ("connection loss to the server"));
200 nanosleep(ts, 0);
201 count++;
202 }
203 }
204 return ret;
205 }
206
207 /** see if our node already exists
208 * if it does then we dup the name and
209 * return it
210 */
lookupnode(struct String_vector * vector,char * prefix)211 static char* lookupnode(struct String_vector *vector, char *prefix) {
212 char *ret = NULL;
213 if (vector->data) {
214 int i = 0;
215 for (i = 0; i < vector->count; i++) {
216 char* child = vector->data[i];
217 if (strncmp(prefix, child, strlen(prefix)) == 0) {
218 ret = strdup(child);
219 break;
220 }
221 }
222 }
223 return ret;
224 }
225
226 /** retry zoo_wexists
227 */
retry_zoowexists(zhandle_t * zh,char * path,watcher_fn watcher,void * ctx,struct Stat * stat,struct timespec * ts,int retry)228 static int retry_zoowexists(zhandle_t *zh, char* path, watcher_fn watcher, void* ctx,
229 struct Stat *stat, struct timespec *ts, int retry) {
230 int ret = ZCONNECTIONLOSS;
231 int count = 0;
232 while (ret == ZCONNECTIONLOSS && count < retry) {
233 ret = zoo_wexists(zh, path, watcher, ctx, stat);
234 if (ret == ZCONNECTIONLOSS) {
235 LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while setting watch on my predecessor"));
236 nanosleep(ts, 0);
237 count++;
238 }
239 }
240 return ret;
241 }
242
243 /**
244 * the main code that does the zookeeper leader
245 * election. this code creates its own ephemeral
246 * node on the given path and sees if its the first
247 * one on the list and claims to be a leader if and only
248 * if its the first one of children in the paretn path
249 */
zkr_lock_operation(zkr_lock_mutex_t * mutex,struct timespec * ts)250 static int zkr_lock_operation(zkr_lock_mutex_t *mutex, struct timespec *ts) {
251 zhandle_t *zh = mutex->zh;
252 char *path = mutex->path;
253 char *id = mutex->id;
254 struct Stat stat;
255 char* owner_id = NULL;
256 int retry = 3;
257 do {
258 const clientid_t *cid = zoo_client_id(zh);
259 // get the session id
260 int64_t session = cid->client_id;
261 char prefix[30];
262 int ret = 0;
263 #if defined(__x86_64__)
264 snprintf(prefix, 30, "x-%016lx-", session);
265 #else
266 snprintf(prefix, 30, "x-%016llx-", session);
267 #endif
268 struct String_vector vectorst;
269 vectorst.data = NULL;
270 vectorst.count = 0;
271 ret = ZCONNECTIONLOSS;
272 ret = retry_getchildren(zh, path, &vectorst, ts, retry);
273 if (ret != ZOK)
274 return ret;
275 struct String_vector *vector = &vectorst;
276 mutex->id = lookupnode(vector, prefix);
277 free_String_vector(vector);
278 if (mutex->id == NULL) {
279 int len = strlen(path) + strlen(prefix) + 2;
280 char buf[len];
281 char retbuf[len+20];
282 snprintf(buf, len, "%s/%s", path, prefix);
283 ret = ZCONNECTIONLOSS;
284 ret = zoo_create(zh, buf, NULL, 0, mutex->acl,
285 ZOO_EPHEMERAL|ZOO_SEQUENCE, retbuf, (len+20));
286
287 // do not want to retry the create since
288 // we would end up creating more than one child
289 if (ret != ZOK) {
290 LOG_WARN(LOGCALLBACK(zh), "could not create zoo node %s", buf);
291 return ret;
292 }
293 mutex->id = getName(retbuf);
294 }
295
296 if (mutex->id != NULL) {
297 ret = ZCONNECTIONLOSS;
298 ret = retry_getchildren(zh, path, vector, ts, retry);
299 if (ret != ZOK) {
300 LOG_WARN(LOGCALLBACK(zh), ("could not connect to server"));
301 return ret;
302 }
303 //sort this list
304 sort_children(vector);
305 owner_id = vector->data[0];
306 mutex->ownerid = strdup(owner_id);
307 id = mutex->id;
308 char* lessthanme = child_floor(vector->data, vector->count, id);
309 if (lessthanme != NULL) {
310 int flen = strlen(mutex->path) + strlen(lessthanme) + 2;
311 char last_child[flen];
312 sprintf(last_child, "%s/%s",mutex->path, lessthanme);
313 ret = ZCONNECTIONLOSS;
314 ret = retry_zoowexists(zh, last_child, &lock_watcher_fn, mutex,
315 &stat, ts, retry);
316 // cannot watch my predecessor i am giving up
317 // we need to be able to watch the predecessor
318 // since if we do not become a leader the others
319 // will keep waiting
320 if (ret != ZOK) {
321 free_String_vector(vector);
322 LOG_WARN(LOGCALLBACK(zh), ("unable to watch my predecessor"));
323 ret = _zkr_lock_unlock_nolock(mutex);
324 while (ret == 0) {
325 //we have to give up our leadership
326 // since we cannot watch out predecessor
327 ret = _zkr_lock_unlock_nolock(mutex);
328 }
329 return ret;
330 }
331 // we are not the owner of the lock
332 mutex->isOwner = 0;
333 }
334 else {
335 // this is the case when we are the owner
336 // of the lock
337 if (strcmp(mutex->id, owner_id) == 0) {
338 LOG_DEBUG(LOGCALLBACK(zh), "got the zoo lock owner - %s", mutex->id);
339 mutex->isOwner = 1;
340 if (mutex->completion != NULL) {
341 mutex->completion(0, mutex->cbdata);
342 }
343 return ZOK;
344 }
345 }
346 free_String_vector(vector);
347 return ZOK;
348 }
349 } while (mutex->id == NULL);
350 return ZOK;
351 }
352
zkr_lock_lock(zkr_lock_mutex_t * mutex)353 ZOOAPI int zkr_lock_lock(zkr_lock_mutex_t *mutex) {
354 pthread_mutex_lock(&(mutex->pmutex));
355 zhandle_t *zh = mutex->zh;
356 char *path = mutex->path;
357 struct Stat stat;
358 int exists = zoo_exists(zh, path, 0, &stat);
359 int count = 0;
360 struct timespec ts;
361 ts.tv_sec = 0;
362 ts.tv_nsec = (.5)*1000000;
363 // retry to see if the path exists and
364 // and create if the path does not exist
365 while ((exists == ZCONNECTIONLOSS || exists == ZNONODE) && (count <4)) {
366 count++;
367 // retry the operation
368 if (exists == ZCONNECTIONLOSS)
369 exists = zoo_exists(zh, path, 0, &stat);
370 else if (exists == ZNONODE)
371 exists = zoo_create(zh, path, NULL, 0, mutex->acl, 0, NULL, 0);
372 nanosleep(&ts, 0);
373
374 }
375
376 // need to check if we cannot still access the server
377 int check_retry = ZCONNECTIONLOSS;
378 count = 0;
379 while (check_retry != ZOK && count <4) {
380 check_retry = zkr_lock_operation(mutex, &ts);
381 if (check_retry != ZOK) {
382 nanosleep(&ts, 0);
383 count++;
384 }
385 }
386 pthread_mutex_unlock(&(mutex->pmutex));
387 return 0;
388 }
389
390
zkr_lock_getpath(zkr_lock_mutex_t * mutex)391 ZOOAPI char* zkr_lock_getpath(zkr_lock_mutex_t *mutex) {
392 return mutex->path;
393 }
394
zkr_lock_isowner(zkr_lock_mutex_t * mutex)395 ZOOAPI int zkr_lock_isowner(zkr_lock_mutex_t *mutex) {
396 return (mutex->id != NULL && mutex->ownerid != NULL
397 && (strcmp(mutex->id, mutex->ownerid) == 0));
398 }
399
zkr_lock_getid(zkr_lock_mutex_t * mutex)400 ZOOAPI char* zkr_lock_getid(zkr_lock_mutex_t *mutex) {
401 return mutex->ownerid;
402 }
403
zkr_lock_destroy(zkr_lock_mutex_t * mutex)404 ZOOAPI int zkr_lock_destroy(zkr_lock_mutex_t* mutex) {
405 if (mutex->id)
406 free(mutex->id);
407 mutex->path = NULL;
408 mutex->acl = NULL;
409 mutex->completion = NULL;
410 pthread_mutex_destroy(&(mutex->pmutex));
411 mutex->isOwner = 0;
412 if (mutex->ownerid)
413 free(mutex->ownerid);
414 return 0;
415 }
416
417