1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 #ifdef HAVE_CONFIG_H 20 #include "config.h" 21 #endif 22 23 #ifdef DLL_EXPORT 24 #define USE_STATIC_LIB 25 #endif 26 27 #if defined(__CYGWIN__) 28 #define USE_IPV6 29 #endif 30 31 #include <stdlib.h> 32 #include <stdio.h> 33 #include <string.h> 34 #include <zookeeper_log.h> 35 #include <time.h> 36 #include <sys/time.h> 37 #include <sys/socket.h> 38 #include <limits.h> 39 #include <zoo_lock.h> 40 #include <stdbool.h> 41 #ifdef HAVE_SYS_UTSNAME_H 42 #include <sys/utsname.h> 43 #endif 44 45 #ifdef HAVE_GETPWUID_R 46 #include <pwd.h> 47 #endif 48 49 #define IF_DEBUG(x) if (logLevel==ZOO_LOG_LEVEL_DEBUG) {x;} 50 51 52 ZOOAPI int zkr_lock_init(zkr_lock_mutex_t* mutex, zhandle_t* zh, 53 char* path, struct ACL_vector *acl) { 54 mutex->zh = zh; 55 mutex->path = path; 56 mutex->acl = acl; 57 mutex->completion = NULL; 58 mutex->cbdata = NULL; 59 mutex->id = NULL; 60 mutex->ownerid = NULL; 61 mutex->isOwner = 0; 62 pthread_mutex_init(&(mutex->pmutex), NULL); 63 return 0; 64 } 65 66 ZOOAPI int zkr_lock_init_cb(zkr_lock_mutex_t *mutex, zhandle_t* zh, 67 char *path, struct ACL_vector *acl, 68 zkr_lock_completion completion, void* cbdata) { 69 mutex->zh = zh; 70 mutex->path = path; 71 mutex->acl = acl; 72 mutex->completion = completion; 73 mutex->cbdata = cbdata; 74 mutex->isOwner = 0; 75 mutex->ownerid = NULL; 76 mutex->id = NULL; 77 pthread_mutex_init(&(mutex->pmutex), NULL); 78 return 0; 79 } 80 81 /** 82 * unlock the mutex 83 */ 84 ZOOAPI int zkr_lock_unlock(zkr_lock_mutex_t *mutex) { 85 pthread_mutex_lock(&(mutex->pmutex)); 86 zhandle_t *zh = mutex->zh; 87 if (mutex->id != NULL) { 88 int len = strlen(mutex->path) + strlen(mutex->id) + 2; 89 char buf[len]; 90 sprintf(buf, "%s/%s", mutex->path, mutex->id); 91 int ret = 0; 92 int count = 0; 93 struct timespec ts; 94 ts.tv_sec = 0; 95 ts.tv_nsec = (.5)*1000000; 96 ret = ZCONNECTIONLOSS; 97 while (ret == ZCONNECTIONLOSS && (count < 3)) { 98 ret = zoo_delete(zh, buf, -1); 99 if (ret == ZCONNECTIONLOSS) { 100 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5) 101 LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while deleting the node")); 102 #else 103 LOG_DEBUG(("connectionloss while deleting the node")); 104 #endif 105 nanosleep(&ts, 0); 106 count++; 107 } 108 } 109 if (ret == ZOK || ret == ZNONODE) { 110 zkr_lock_completion completion = mutex->completion; 111 if (completion != NULL) { 112 completion(1, mutex->cbdata); 113 } 114 115 free(mutex->id); 116 mutex->id = NULL; 117 pthread_mutex_unlock(&(mutex->pmutex)); 118 return 0; 119 } 120 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5) 121 LOG_WARN(LOGCALLBACK(zh), ("not able to connect to server - giving up")); 122 #else 123 LOG_WARN(("not able to connect to server - giving up")); 124 #endif 125 pthread_mutex_unlock(&(mutex->pmutex)); 126 return ZCONNECTIONLOSS; 127 } 128 pthread_mutex_unlock(&(mutex->pmutex)); 129 return ZSYSTEMERROR; 130 } 131 132 static void free_String_vector(struct String_vector *v) { 133 if (v->data) { 134 int32_t i; 135 for (i=0; i<v->count; i++) { 136 free(v->data[i]); 137 } 138 free(v->data); 139 v->data = 0; 140 } 141 } 142 143 static int vstrcmp(const void* str1, const void* str2) { 144 const char **a = (const char**)str1; 145 const char **b = (const char**) str2; 146 return strcmp(strrchr(*a, '-')+1, strrchr(*b, '-')+1); 147 } 148 149 static void sort_children(struct String_vector *vector) { 150 qsort( vector->data, vector->count, sizeof(char*), &vstrcmp); 151 } 152 153 static char* child_floor(char **sorted_data, int len, char *element) { 154 char* ret = NULL; 155 int i =0; 156 for (i=0; i < len; i++) { 157 if (strcmp(sorted_data[i], element) < 0) { 158 ret = sorted_data[i]; 159 } 160 } 161 return ret; 162 } 163 164 static void lock_watcher_fn(zhandle_t* zh, int type, int state, 165 const char* path, void *watcherCtx) { 166 //callback that we registered 167 //should be called 168 zkr_lock_lock((zkr_lock_mutex_t*) watcherCtx); 169 } 170 171 /** 172 * get the last name of the path 173 */ 174 static char* getName(char* str) { 175 char* name = strrchr(str, '/'); 176 if (name == NULL) 177 return NULL; 178 return strdup(name + 1); 179 } 180 181 /** 182 * just a method to retry get children 183 */ 184 static int retry_getchildren(zhandle_t *zh, char* path, struct String_vector *vector, 185 struct timespec *ts, int retry) { 186 int ret = ZCONNECTIONLOSS; 187 int count = 0; 188 while (ret == ZCONNECTIONLOSS && count < retry) { 189 ret = zoo_get_children(zh, path, 0, vector); 190 if (ret == ZCONNECTIONLOSS) { 191 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5) 192 LOG_DEBUG(LOGCALLBACK(zh), ("connection loss to the server")); 193 #else 194 LOG_DEBUG(("connection loss to the server")); 195 #endif 196 nanosleep(ts, 0); 197 count++; 198 } 199 } 200 return ret; 201 } 202 203 /** see if our node already exists 204 * if it does then we dup the name and 205 * return it 206 */ 207 static char* lookupnode(struct String_vector *vector, char *prefix) { 208 char *ret = NULL; 209 if (vector->data) { 210 int i = 0; 211 for (i = 0; i < vector->count; i++) { 212 char* child = vector->data[i]; 213 if (strncmp(prefix, child, strlen(prefix)) == 0) { 214 ret = strdup(child); 215 break; 216 } 217 } 218 } 219 return ret; 220 } 221 222 /** retry zoo_wexists 223 */ 224 static int retry_zoowexists(zhandle_t *zh, char* path, watcher_fn watcher, void* ctx, 225 struct Stat *stat, struct timespec *ts, int retry) { 226 int ret = ZCONNECTIONLOSS; 227 int count = 0; 228 while (ret == ZCONNECTIONLOSS && count < retry) { 229 ret = zoo_wexists(zh, path, watcher, ctx, stat); 230 if (ret == ZCONNECTIONLOSS) { 231 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5) 232 LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while setting watch on my predecessor")); 233 #else 234 LOG_DEBUG(("connectionloss while setting watch on my predecessor")); 235 #endif 236 nanosleep(ts, 0); 237 count++; 238 } 239 } 240 return ret; 241 } 242 243 /** 244 * the main code that does the zookeeper leader 245 * election. this code creates its own ephemeral 246 * node on the given path and sees if its the first 247 * one on the list and claims to be a leader if and only 248 * if its the first one of children in the paretn path 249 */ 250 static int zkr_lock_operation(zkr_lock_mutex_t *mutex, struct timespec *ts) { 251 zhandle_t *zh = mutex->zh; 252 char *path = mutex->path; 253 char *id = mutex->id; 254 struct Stat stat; 255 char* owner_id = NULL; 256 int retry = 3; 257 do { 258 const clientid_t *cid = zoo_client_id(zh); 259 // get the session id 260 int64_t session = cid->client_id; 261 char prefix[30]; 262 int ret = 0; 263 #if defined(__x86_64__) 264 snprintf(prefix, 30, "x-%016lx-", session); 265 #else 266 snprintf(prefix, 30, "x-%016llx-", session); 267 #endif 268 struct String_vector vectorst; 269 vectorst.data = NULL; 270 vectorst.count = 0; 271 ret = ZCONNECTIONLOSS; 272 ret = retry_getchildren(zh, path, &vectorst, ts, retry); 273 if (ret != ZOK) 274 return ret; 275 struct String_vector *vector = &vectorst; 276 mutex->id = lookupnode(vector, prefix); 277 free_String_vector(vector); 278 if (mutex->id == NULL) { 279 int len = strlen(path) + strlen(prefix) + 2; 280 char buf[len]; 281 char retbuf[len+20]; 282 snprintf(buf, len, "%s/%s", path, prefix); 283 ret = ZCONNECTIONLOSS; 284 ret = zoo_create(zh, buf, NULL, 0, mutex->acl, 285 ZOO_EPHEMERAL|ZOO_SEQUENCE, retbuf, (len+20)); 286 287 // do not want to retry the create since 288 // we would end up creating more than one child 289 if (ret != ZOK) { 290 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5) 291 LOG_WARN(LOGCALLBACK(zh), ("could not create zoo node %s", buf)); 292 #else 293 LOG_WARN(("could not create zoo node %s", buf)); 294 #endif 295 return ret; 296 } 297 mutex->id = getName(retbuf); 298 } 299 300 if (mutex->id != NULL) { 301 ret = ZCONNECTIONLOSS; 302 ret = retry_getchildren(zh, path, vector, ts, retry); 303 if (ret != ZOK) { 304 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5) 305 LOG_WARN(LOGCALLBACK(zh), ("could not connect to server")); 306 #else 307 LOG_WARN(("could not connect to server")); 308 #endif 309 return ret; 310 } 311 //sort this list 312 sort_children(vector); 313 owner_id = vector->data[0]; 314 mutex->ownerid = strdup(owner_id); 315 id = mutex->id; 316 char* lessthanme = child_floor(vector->data, vector->count, id); 317 if (lessthanme != NULL) { 318 int flen = strlen(mutex->path) + strlen(lessthanme) + 2; 319 char last_child[flen]; 320 sprintf(last_child, "%s/%s",mutex->path, lessthanme); 321 ret = ZCONNECTIONLOSS; 322 ret = retry_zoowexists(zh, last_child, &lock_watcher_fn, mutex, 323 &stat, ts, retry); 324 // cannot watch my predecessor i am giving up 325 // we need to be able to watch the predecessor 326 // since if we do not become a leader the others 327 // will keep waiting 328 if (ret != ZOK) { 329 free_String_vector(vector); 330 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5) 331 LOG_WARN(LOGCALLBACK(zh), ("unable to watch my predecessor")); 332 #else 333 LOG_WARN(("unable to watch my predecessor")); 334 #endif 335 ret = zkr_lock_unlock(mutex); 336 while (ret == 0) { 337 //we have to give up our leadership 338 // since we cannot watch out predecessor 339 ret = zkr_lock_unlock(mutex); 340 } 341 return ret; 342 } 343 // we are not the owner of the lock 344 mutex->isOwner = 0; 345 } 346 else { 347 // this is the case when we are the owner 348 // of the lock 349 if (strcmp(mutex->id, owner_id) == 0) { 350 #if defined(ZOO_VERSION) || (ZOO_MAJOR_VERSION>=3 && ZOO_MINOR_VERSION>=5) 351 LOG_DEBUG(LOGCALLBACK(zh), ("got the zoo lock owner - %s", mutex->id)); 352 #else 353 LOG_DEBUG(("got the zoo lock owner - %s", mutex->id)); 354 #endif 355 mutex->isOwner = 1; 356 if (mutex->completion != NULL) { 357 mutex->completion(0, mutex->cbdata); 358 } 359 return ZOK; 360 } 361 } 362 free_String_vector(vector); 363 return ZOK; 364 } 365 } while (mutex->id == NULL); 366 return ZOK; 367 } 368 369 ZOOAPI int zkr_lock_lock(zkr_lock_mutex_t *mutex) { 370 pthread_mutex_lock(&(mutex->pmutex)); 371 zhandle_t *zh = mutex->zh; 372 char *path = mutex->path; 373 struct Stat stat; 374 int exists = zoo_exists(zh, path, 0, &stat); 375 int count = 0; 376 struct timespec ts; 377 ts.tv_sec = 0; 378 ts.tv_nsec = (.5)*1000000; 379 // retry to see if the path exists and 380 // and create if the path does not exist 381 while ((exists == ZCONNECTIONLOSS || exists == ZNONODE) && (count <4)) { 382 count++; 383 // retry the operation 384 if (exists == ZCONNECTIONLOSS) 385 exists = zoo_exists(zh, path, 0, &stat); 386 else if (exists == ZNONODE) 387 exists = zoo_create(zh, path, NULL, 0, mutex->acl, 0, NULL, 0); 388 nanosleep(&ts, 0); 389 390 } 391 392 // need to check if we cannot still access the server 393 int check_retry = ZCONNECTIONLOSS; 394 count = 0; 395 while (check_retry != ZOK && count <4) { 396 check_retry = zkr_lock_operation(mutex, &ts); 397 if (check_retry != ZOK) { 398 nanosleep(&ts, 0); 399 count++; 400 } 401 } 402 pthread_mutex_unlock(&(mutex->pmutex)); 403 return zkr_lock_isowner(mutex); 404 } 405 406 407 ZOOAPI char* zkr_lock_getpath(zkr_lock_mutex_t *mutex) { 408 return mutex->path; 409 } 410 411 ZOOAPI int zkr_lock_isowner(zkr_lock_mutex_t *mutex) { 412 return (mutex->id != NULL && mutex->ownerid != NULL 413 && (strcmp(mutex->id, mutex->ownerid) == 0)); 414 } 415 416 ZOOAPI char* zkr_lock_getid(zkr_lock_mutex_t *mutex) { 417 return mutex->ownerid; 418 } 419 420 ZOOAPI int zkr_lock_destroy(zkr_lock_mutex_t* mutex) { 421 if (mutex->id) 422 free(mutex->id); 423 mutex->path = NULL; 424 mutex->acl = NULL; 425 mutex->completion = NULL; 426 pthread_mutex_destroy(&(mutex->pmutex)); 427 mutex->isOwner = 0; 428 if (mutex->ownerid) 429 free(mutex->ownerid); 430 return 0; 431 } 432 433