1 /* $NetBSD: cluster.c,v 1.1.1.1 2009/12/02 00:27:08 haad Exp $ */ 2 3 /* 4 * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 5 * 6 * This copyrighted material is made available to anyone wishing to use, 7 * modify, copy, or redistribute it subject to the terms and conditions 8 * of the GNU Lesser General Public License v.2.1. 9 * 10 * You should have received a copy of the GNU Lesser General Public License 11 * along with this program; if not, write to the Free Software Foundation, 12 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 13 */ 14 #include <errno.h> 15 #include <string.h> 16 #include <sys/types.h> 17 #include <unistd.h> 18 #include <stdint.h> 19 #include <stdlib.h> 20 #include <signal.h> 21 #include <sys/socket.h> /* These are for OpenAIS CPGs */ 22 #include <sys/select.h> 23 #include <sys/un.h> 24 #include <netinet/in.h> 25 #include <arpa/inet.h> 26 #include <corosync/corotypes.h> 27 #include <corosync/cpg.h> 28 #include <openais/saAis.h> 29 #include <openais/saCkpt.h> 30 31 #include "dm-log-userspace.h" 32 #include "libdevmapper.h" 33 #include "functions.h" 34 #include "local.h" 35 #include "common.h" 36 #include "logging.h" 37 #include "link_mon.h" 38 #include "cluster.h" 39 40 /* Open AIS error codes */ 41 #define str_ais_error(x) \ 42 ((x) == SA_AIS_OK) ? "SA_AIS_OK" : \ 43 ((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \ 44 ((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \ 45 ((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \ 46 ((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \ 47 ((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \ 48 ((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \ 49 ((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \ 50 ((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \ 51 ((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \ 52 ((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \ 53 ((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \ 54 ((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \ 55 ((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \ 56 ((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \ 57 ((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \ 58 ((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \ 59 ((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \ 60 ((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \ 61 ((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \ 62 ((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \ 63 ((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \ 64 ((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \ 65 ((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \ 66 ((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \ 67 ((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \ 68 ((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \ 69 "ais_error_unknown" 70 71 #define DM_ULOG_RESPONSE 0x1000 /* in last byte of 32-bit value */ 72 #define DM_ULOG_CHECKPOINT_READY 21 73 #define DM_ULOG_MEMBER_JOIN 22 74 75 #define _RQ_TYPE(x) \ 76 ((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \ 77 ((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN": \ 78 RQ_TYPE((x) & ~DM_ULOG_RESPONSE) 79 80 static uint32_t my_cluster_id = 0xDEAD; 81 static SaCkptHandleT ckpt_handle = 0; 82 static SaCkptCallbacksT callbacks = { 0, 0 }; 83 static SaVersionT version = { 'B', 1, 1 }; 84 85 #define DEBUGGING_HISTORY 100 86 //static char debugging[DEBUGGING_HISTORY][128]; 87 //static int idx = 0; 88 #define LOG_SPRINT(cc, f, arg...) do { \ 89 cc->idx++; \ 90 cc->idx = cc->idx % DEBUGGING_HISTORY; \ 91 sprintf(cc->debugging[cc->idx], f, ## arg); \ 92 } while (0) 93 94 static int log_resp_rec = 0; 95 96 struct checkpoint_data { 97 uint32_t requester; 98 char uuid[CPG_MAX_NAME_LENGTH]; 99 100 int bitmap_size; /* in bytes */ 101 char *sync_bits; 102 char *clean_bits; 103 char *recovering_region; 104 struct checkpoint_data *next; 105 }; 106 107 #define INVALID 0 108 #define VALID 1 109 #define LEAVING 2 110 111 #define MAX_CHECKPOINT_REQUESTERS 10 112 struct clog_cpg { 113 struct dm_list list; 114 115 uint32_t lowest_id; 116 cpg_handle_t handle; 117 struct cpg_name name; 118 uint64_t luid; 119 120 /* Are we the first, or have we received checkpoint? */ 121 int state; 122 int cpg_state; /* FIXME: debugging */ 123 int free_me; 124 int delay; 125 int resend_requests; 126 struct dm_list startup_list; 127 struct dm_list working_list; 128 129 int checkpoints_needed; 130 uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS]; 131 struct checkpoint_data *checkpoint_list; 132 int idx; 133 char debugging[DEBUGGING_HISTORY][128]; 134 }; 135 136 static struct dm_list clog_cpg_list; 137 138 /* 139 * cluster_send 140 * @rq 141 * 142 * Returns: 0 on success, -Exxx on error 143 */ 144 int cluster_send(struct clog_request *rq) 145 { 146 int r; 147 int count=0; 148 int found; 149 struct iovec iov; 150 struct clog_cpg *entry; 151 152 dm_list_iterate_items(entry, &clog_cpg_list) 153 if (!strncmp(entry->name.value, rq->u_rq.uuid, 154 CPG_MAX_NAME_LENGTH)) { 155 found = 1; 156 break; 157 } 158 159 if (!found) { 160 rq->u_rq.error = -ENOENT; 161 return -ENOENT; 162 } 163 164 /* 165 * Once the request heads for the cluster, the luid looses 166 * all its meaning. 167 */ 168 rq->u_rq.luid = 0; 169 170 iov.iov_base = rq; 171 iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size; 172 173 if (entry->cpg_state != VALID) 174 return -EINVAL; 175 176 do { 177 r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1); 178 if (r != SA_AIS_ERR_TRY_AGAIN) 179 break; 180 count++; 181 if (count < 10) 182 LOG_PRINT("[%s] Retry #%d of cpg_mcast_joined: %s", 183 SHORT_UUID(rq->u_rq.uuid), count, 184 str_ais_error(r)); 185 else if ((count < 100) && !(count % 10)) 186 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s", 187 SHORT_UUID(rq->u_rq.uuid), count, 188 str_ais_error(r)); 189 else if ((count < 1000) && !(count % 100)) 190 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s", 191 SHORT_UUID(rq->u_rq.uuid), count, 192 str_ais_error(r)); 193 else if ((count < 10000) && !(count % 1000)) 194 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s - " 195 "OpenAIS not handling the load?", 196 SHORT_UUID(rq->u_rq.uuid), count, 197 str_ais_error(r)); 198 usleep(1000); 199 } while (1); 200 201 if (r == CPG_OK) 202 return 0; 203 204 /* error codes found in openais/cpg.h */ 205 LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r)); 206 207 rq->u_rq.error = -EBADE; 208 return -EBADE; 209 } 210 211 static struct clog_request *get_matching_rq(struct clog_request *rq, 212 struct dm_list *l) 213 { 214 struct clog_request *match, *n; 215 216 dm_list_iterate_items_safe(match, n, l) 217 if (match->u_rq.seq == rq->u_rq.seq) { 218 dm_list_del(&match->list); 219 return match; 220 } 221 222 return NULL; 223 } 224 225 static char rq_buffer[DM_ULOG_REQUEST_SIZE]; 226 static int handle_cluster_request(struct clog_cpg *entry, 227 struct clog_request *rq, int server) 228 { 229 int r = 0; 230 struct clog_request *tmp = (struct clog_request *)rq_buffer; 231 232 /* 233 * We need a separate dm_ulog_request struct, one that can carry 234 * a return payload. Otherwise, the memory address after 235 * rq will be altered - leading to problems 236 */ 237 memset(rq_buffer, 0, sizeof(rq_buffer)); 238 memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size); 239 240 /* 241 * With resumes, we only handle our own. 242 * Resume is a special case that requires 243 * local action (to set up CPG), followed by 244 * a cluster action to co-ordinate reading 245 * the disk and checkpointing 246 */ 247 if (tmp->u_rq.request_type == DM_ULOG_RESUME) { 248 if (tmp->originator == my_cluster_id) { 249 r = do_request(tmp, server); 250 251 r = kernel_send(&tmp->u_rq); 252 if (r < 0) 253 LOG_ERROR("Failed to send resume response to kernel"); 254 } 255 return r; 256 } 257 258 r = do_request(tmp, server); 259 260 if (server && 261 (tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) && 262 (tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) { 263 tmp->u_rq.request_type |= DM_ULOG_RESPONSE; 264 265 /* 266 * Errors from previous functions are in the rq struct. 267 */ 268 r = cluster_send(tmp); 269 if (r < 0) 270 LOG_ERROR("cluster_send failed: %s", strerror(-r)); 271 } 272 273 return r; 274 } 275 276 static int handle_cluster_response(struct clog_cpg *entry, 277 struct clog_request *rq) 278 { 279 int r = 0; 280 struct clog_request *orig_rq; 281 282 /* 283 * If I didn't send it, then I don't care about the response 284 */ 285 if (rq->originator != my_cluster_id) 286 return 0; 287 288 rq->u_rq.request_type &= ~DM_ULOG_RESPONSE; 289 orig_rq = get_matching_rq(rq, &entry->working_list); 290 291 if (!orig_rq) { 292 /* Unable to find match for response */ 293 294 LOG_ERROR("[%s] No match for cluster response: %s:%u", 295 SHORT_UUID(rq->u_rq.uuid), 296 _RQ_TYPE(rq->u_rq.request_type), 297 rq->u_rq.seq); 298 299 LOG_ERROR("Current local list:"); 300 if (dm_list_empty(&entry->working_list)) 301 LOG_ERROR(" [none]"); 302 303 dm_list_iterate_items(orig_rq, &entry->working_list) 304 LOG_ERROR(" [%s] %s:%u", 305 SHORT_UUID(orig_rq->u_rq.uuid), 306 _RQ_TYPE(orig_rq->u_rq.request_type), 307 orig_rq->u_rq.seq); 308 309 return -EINVAL; 310 } 311 312 if (log_resp_rec > 0) { 313 LOG_COND(log_resend_requests, 314 "[%s] Response received to %s/#%u", 315 SHORT_UUID(rq->u_rq.uuid), 316 _RQ_TYPE(rq->u_rq.request_type), 317 rq->u_rq.seq); 318 log_resp_rec--; 319 } 320 321 /* FIXME: Ensure memcpy cannot explode */ 322 memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size); 323 324 r = kernel_send(&orig_rq->u_rq); 325 if (r) 326 LOG_ERROR("Failed to send response to kernel"); 327 328 free(orig_rq); 329 return r; 330 } 331 332 static struct clog_cpg *find_clog_cpg(cpg_handle_t handle) 333 { 334 struct clog_cpg *match; 335 336 dm_list_iterate_items(match, &clog_cpg_list) 337 if (match->handle == handle) 338 return match; 339 340 return NULL; 341 } 342 343 /* 344 * prepare_checkpoint 345 * @entry: clog_cpg describing the log 346 * @cp_requester: nodeid requesting the checkpoint 347 * 348 * Creates and fills in a new checkpoint_data struct. 349 * 350 * Returns: checkpoint_data on success, NULL on error 351 */ 352 static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry, 353 uint32_t cp_requester) 354 { 355 int r; 356 struct checkpoint_data *new; 357 358 if (entry->state != VALID) { 359 /* 360 * We can't store bitmaps yet, because the log is not 361 * valid yet. 362 */ 363 LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet", 364 cp_requester); 365 return NULL; 366 } 367 368 new = malloc(sizeof(*new)); 369 if (!new) { 370 LOG_ERROR("Unable to create checkpoint data for %u", 371 cp_requester); 372 return NULL; 373 } 374 memset(new, 0, sizeof(*new)); 375 new->requester = cp_requester; 376 strncpy(new->uuid, entry->name.value, entry->name.length); 377 378 new->bitmap_size = push_state(entry->name.value, entry->luid, 379 "clean_bits", 380 &new->clean_bits, cp_requester); 381 if (new->bitmap_size <= 0) { 382 LOG_ERROR("Failed to store clean_bits to checkpoint for node %u", 383 new->requester); 384 free(new); 385 return NULL; 386 } 387 388 new->bitmap_size = push_state(entry->name.value, entry->luid, 389 "sync_bits", 390 &new->sync_bits, cp_requester); 391 if (new->bitmap_size <= 0) { 392 LOG_ERROR("Failed to store sync_bits to checkpoint for node %u", 393 new->requester); 394 free(new->clean_bits); 395 free(new); 396 return NULL; 397 } 398 399 r = push_state(entry->name.value, entry->luid, 400 "recovering_region", 401 &new->recovering_region, cp_requester); 402 if (r <= 0) { 403 LOG_ERROR("Failed to store recovering_region to checkpoint for node %u", 404 new->requester); 405 free(new->sync_bits); 406 free(new->clean_bits); 407 free(new); 408 return NULL; 409 } 410 LOG_DBG("[%s] Checkpoint prepared for node %u:", 411 SHORT_UUID(new->uuid), new->requester); 412 LOG_DBG(" bitmap_size = %d", new->bitmap_size); 413 414 return new; 415 } 416 417 /* 418 * free_checkpoint 419 * @cp: the checkpoint_data struct to free 420 * 421 */ 422 static void free_checkpoint(struct checkpoint_data *cp) 423 { 424 free(cp->recovering_region); 425 free(cp->sync_bits); 426 free(cp->clean_bits); 427 free(cp); 428 } 429 430 static int export_checkpoint(struct checkpoint_data *cp) 431 { 432 SaCkptCheckpointCreationAttributesT attr; 433 SaCkptCheckpointHandleT h; 434 SaCkptSectionIdT section_id; 435 SaCkptSectionCreationAttributesT section_attr; 436 SaCkptCheckpointOpenFlagsT flags; 437 SaNameT name; 438 SaAisErrorT rv; 439 struct clog_request *rq; 440 int len, r = 0; 441 char buf[32]; 442 443 LOG_DBG("Sending checkpointed data to %u", cp->requester); 444 445 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, 446 "bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester); 447 name.length = len; 448 449 len = strlen(cp->recovering_region) + 1; 450 451 attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS; 452 attr.checkpointSize = cp->bitmap_size * 2 + len; 453 454 attr.retentionDuration = SA_TIME_MAX; 455 attr.maxSections = 4; /* don't know why we need +1 */ 456 457 attr.maxSectionSize = (cp->bitmap_size > len) ? cp->bitmap_size : len; 458 attr.maxSectionIdSize = 22; 459 460 flags = SA_CKPT_CHECKPOINT_READ | 461 SA_CKPT_CHECKPOINT_WRITE | 462 SA_CKPT_CHECKPOINT_CREATE; 463 464 open_retry: 465 rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h); 466 if (rv == SA_AIS_ERR_TRY_AGAIN) { 467 LOG_ERROR("export_checkpoint: ckpt open retry"); 468 usleep(1000); 469 goto open_retry; 470 } 471 472 if (rv == SA_AIS_ERR_EXIST) { 473 LOG_DBG("export_checkpoint: checkpoint already exists"); 474 return -EEXIST; 475 } 476 477 if (rv != SA_AIS_OK) { 478 LOG_ERROR("[%s] Failed to open checkpoint for %u: %s", 479 SHORT_UUID(cp->uuid), cp->requester, 480 str_ais_error(rv)); 481 return -EIO; /* FIXME: better error */ 482 } 483 484 /* 485 * Add section for sync_bits 486 */ 487 section_id.idLen = snprintf(buf, 32, "sync_bits"); 488 section_id.id = (unsigned char *)buf; 489 section_attr.sectionId = §ion_id; 490 section_attr.expirationTime = SA_TIME_END; 491 492 sync_create_retry: 493 rv = saCkptSectionCreate(h, §ion_attr, 494 cp->sync_bits, cp->bitmap_size); 495 if (rv == SA_AIS_ERR_TRY_AGAIN) { 496 LOG_ERROR("Sync checkpoint section create retry"); 497 usleep(1000); 498 goto sync_create_retry; 499 } 500 501 if (rv == SA_AIS_ERR_EXIST) { 502 LOG_DBG("Sync checkpoint section already exists"); 503 saCkptCheckpointClose(h); 504 return -EEXIST; 505 } 506 507 if (rv != SA_AIS_OK) { 508 LOG_ERROR("Sync checkpoint section creation failed: %s", 509 str_ais_error(rv)); 510 saCkptCheckpointClose(h); 511 return -EIO; /* FIXME: better error */ 512 } 513 514 /* 515 * Add section for clean_bits 516 */ 517 section_id.idLen = snprintf(buf, 32, "clean_bits"); 518 section_id.id = (unsigned char *)buf; 519 section_attr.sectionId = §ion_id; 520 section_attr.expirationTime = SA_TIME_END; 521 522 clean_create_retry: 523 rv = saCkptSectionCreate(h, §ion_attr, cp->clean_bits, cp->bitmap_size); 524 if (rv == SA_AIS_ERR_TRY_AGAIN) { 525 LOG_ERROR("Clean checkpoint section create retry"); 526 usleep(1000); 527 goto clean_create_retry; 528 } 529 530 if (rv == SA_AIS_ERR_EXIST) { 531 LOG_DBG("Clean checkpoint section already exists"); 532 saCkptCheckpointClose(h); 533 return -EEXIST; 534 } 535 536 if (rv != SA_AIS_OK) { 537 LOG_ERROR("Clean checkpoint section creation failed: %s", 538 str_ais_error(rv)); 539 saCkptCheckpointClose(h); 540 return -EIO; /* FIXME: better error */ 541 } 542 543 /* 544 * Add section for recovering_region 545 */ 546 section_id.idLen = snprintf(buf, 32, "recovering_region"); 547 section_id.id = (unsigned char *)buf; 548 section_attr.sectionId = §ion_id; 549 section_attr.expirationTime = SA_TIME_END; 550 551 rr_create_retry: 552 rv = saCkptSectionCreate(h, §ion_attr, cp->recovering_region, 553 strlen(cp->recovering_region) + 1); 554 if (rv == SA_AIS_ERR_TRY_AGAIN) { 555 LOG_ERROR("RR checkpoint section create retry"); 556 usleep(1000); 557 goto rr_create_retry; 558 } 559 560 if (rv == SA_AIS_ERR_EXIST) { 561 LOG_DBG("RR checkpoint section already exists"); 562 saCkptCheckpointClose(h); 563 return -EEXIST; 564 } 565 566 if (rv != SA_AIS_OK) { 567 LOG_ERROR("RR checkpoint section creation failed: %s", 568 str_ais_error(rv)); 569 saCkptCheckpointClose(h); 570 return -EIO; /* FIXME: better error */ 571 } 572 573 LOG_DBG("export_checkpoint: closing checkpoint"); 574 saCkptCheckpointClose(h); 575 576 rq = malloc(DM_ULOG_REQUEST_SIZE); 577 if (!rq) { 578 LOG_ERROR("export_checkpoint: Unable to allocate transfer structs"); 579 return -ENOMEM; 580 } 581 memset(rq, 0, sizeof(*rq)); 582 583 dm_list_init(&rq->list); 584 rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY; 585 rq->originator = cp->requester; /* FIXME: hack to overload meaning of originator */ 586 strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH); 587 rq->u_rq.seq = my_cluster_id; 588 589 r = cluster_send(rq); 590 if (r) 591 LOG_ERROR("Failed to send checkpoint ready notice: %s", 592 strerror(-r)); 593 594 free(rq); 595 return 0; 596 } 597 598 static int import_checkpoint(struct clog_cpg *entry, int no_read) 599 { 600 int rtn = 0; 601 SaCkptCheckpointHandleT h; 602 SaCkptSectionIterationHandleT itr; 603 SaCkptSectionDescriptorT desc; 604 SaCkptIOVectorElementT iov; 605 SaNameT name; 606 SaAisErrorT rv; 607 char *bitmap = NULL; 608 int len; 609 610 bitmap = malloc(1024*1024); 611 if (!bitmap) 612 return -ENOMEM; 613 614 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u", 615 SHORT_UUID(entry->name.value), my_cluster_id); 616 name.length = len; 617 618 open_retry: 619 rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL, 620 SA_CKPT_CHECKPOINT_READ, 0, &h); 621 if (rv == SA_AIS_ERR_TRY_AGAIN) { 622 LOG_ERROR("import_checkpoint: ckpt open retry"); 623 usleep(1000); 624 goto open_retry; 625 } 626 627 if (rv != SA_AIS_OK) { 628 LOG_ERROR("[%s] Failed to open checkpoint: %s", 629 SHORT_UUID(entry->name.value), str_ais_error(rv)); 630 return -EIO; /* FIXME: better error */ 631 } 632 633 unlink_retry: 634 rv = saCkptCheckpointUnlink(ckpt_handle, &name); 635 if (rv == SA_AIS_ERR_TRY_AGAIN) { 636 LOG_ERROR("import_checkpoint: ckpt unlink retry"); 637 usleep(1000); 638 goto unlink_retry; 639 } 640 641 if (no_read) { 642 LOG_DBG("Checkpoint for this log already received"); 643 goto no_read; 644 } 645 646 init_retry: 647 rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 648 SA_TIME_END, &itr); 649 if (rv == SA_AIS_ERR_TRY_AGAIN) { 650 LOG_ERROR("import_checkpoint: sync create retry"); 651 usleep(1000); 652 goto init_retry; 653 } 654 655 if (rv != SA_AIS_OK) { 656 LOG_ERROR("[%s] Sync checkpoint section creation failed: %s", 657 SHORT_UUID(entry->name.value), str_ais_error(rv)); 658 return -EIO; /* FIXME: better error */ 659 } 660 661 len = 0; 662 while (1) { 663 rv = saCkptSectionIterationNext(itr, &desc); 664 if (rv == SA_AIS_OK) 665 len++; 666 else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len) 667 break; 668 else if (rv != SA_AIS_ERR_TRY_AGAIN) { 669 LOG_ERROR("saCkptSectionIterationNext failure: %d", rv); 670 break; 671 } 672 } 673 saCkptSectionIterationFinalize(itr); 674 if (len != 3) { 675 LOG_ERROR("import_checkpoint: %d checkpoint sections found", 676 len); 677 usleep(1000); 678 goto init_retry; 679 } 680 saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 681 SA_TIME_END, &itr); 682 683 while (1) { 684 rv = saCkptSectionIterationNext(itr, &desc); 685 if (rv == SA_AIS_ERR_NO_SECTIONS) 686 break; 687 688 if (rv == SA_AIS_ERR_TRY_AGAIN) { 689 LOG_ERROR("import_checkpoint: ckpt iternext retry"); 690 usleep(1000); 691 continue; 692 } 693 694 if (rv != SA_AIS_OK) { 695 LOG_ERROR("import_checkpoint: clean checkpoint section " 696 "creation failed: %s", str_ais_error(rv)); 697 rtn = -EIO; /* FIXME: better error */ 698 goto fail; 699 } 700 701 if (!desc.sectionSize) { 702 LOG_ERROR("Checkpoint section empty"); 703 continue; 704 } 705 706 memset(bitmap, 0, sizeof(*bitmap)); 707 iov.sectionId = desc.sectionId; 708 iov.dataBuffer = bitmap; 709 iov.dataSize = desc.sectionSize; 710 iov.dataOffset = 0; 711 712 read_retry: 713 rv = saCkptCheckpointRead(h, &iov, 1, NULL); 714 if (rv == SA_AIS_ERR_TRY_AGAIN) { 715 LOG_ERROR("ckpt read retry"); 716 usleep(1000); 717 goto read_retry; 718 } 719 720 if (rv != SA_AIS_OK) { 721 LOG_ERROR("import_checkpoint: ckpt read error: %s", 722 str_ais_error(rv)); 723 rtn = -EIO; /* FIXME: better error */ 724 goto fail; 725 } 726 727 if (iov.readSize) { 728 if (pull_state(entry->name.value, entry->luid, 729 (char *)desc.sectionId.id, bitmap, 730 iov.readSize)) { 731 LOG_ERROR("Error loading state"); 732 rtn = -EIO; 733 goto fail; 734 } 735 } else { 736 /* Need to request new checkpoint */ 737 rtn = -EAGAIN; 738 goto fail; 739 } 740 } 741 742 fail: 743 saCkptSectionIterationFinalize(itr); 744 no_read: 745 saCkptCheckpointClose(h); 746 747 free(bitmap); 748 return rtn; 749 } 750 751 static void do_checkpoints(struct clog_cpg *entry, int leaving) 752 { 753 struct checkpoint_data *cp; 754 755 for (cp = entry->checkpoint_list; cp;) { 756 /* 757 * FIXME: Check return code. Could send failure 758 * notice in rq in export_checkpoint function 759 * by setting rq->error 760 */ 761 switch (export_checkpoint(cp)) { 762 case -EEXIST: 763 LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s", 764 SHORT_UUID(entry->name.value), cp->requester, 765 (leaving) ? "(L)": ""); 766 LOG_COND(log_checkpoint, 767 "[%s] Checkpoint for %u already handled%s", 768 SHORT_UUID(entry->name.value), cp->requester, 769 (leaving) ? "(L)": ""); 770 entry->checkpoint_list = cp->next; 771 free_checkpoint(cp); 772 cp = entry->checkpoint_list; 773 break; 774 case 0: 775 LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s", 776 SHORT_UUID(entry->name.value), cp->requester, 777 (leaving) ? "(L)": ""); 778 LOG_COND(log_checkpoint, 779 "[%s] Checkpoint data available for node %u%s", 780 SHORT_UUID(entry->name.value), cp->requester, 781 (leaving) ? "(L)": ""); 782 entry->checkpoint_list = cp->next; 783 free_checkpoint(cp); 784 cp = entry->checkpoint_list; 785 break; 786 default: 787 /* FIXME: Skipping will cause list corruption */ 788 LOG_ERROR("[%s] Failed to export checkpoint for %u%s", 789 SHORT_UUID(entry->name.value), cp->requester, 790 (leaving) ? "(L)": ""); 791 } 792 } 793 } 794 795 static int resend_requests(struct clog_cpg *entry) 796 { 797 int r = 0; 798 struct clog_request *rq, *n; 799 800 if (!entry->resend_requests || entry->delay) 801 return 0; 802 803 if (entry->state != VALID) 804 return 0; 805 806 entry->resend_requests = 0; 807 808 dm_list_iterate_items_safe(rq, n, &entry->working_list) { 809 dm_list_del(&rq->list); 810 811 if (strcmp(entry->name.value, rq->u_rq.uuid)) { 812 LOG_ERROR("[%s] Stray request from another log (%s)", 813 SHORT_UUID(entry->name.value), 814 SHORT_UUID(rq->u_rq.uuid)); 815 free(rq); 816 continue; 817 } 818 819 switch (rq->u_rq.request_type) { 820 case DM_ULOG_SET_REGION_SYNC: 821 /* 822 * Some requests simply do not need to be resent. 823 * If it is a request that just changes log state, 824 * then it doesn't need to be resent (everyone makes 825 * updates). 826 */ 827 LOG_COND(log_resend_requests, 828 "[%s] Skipping resend of %s/#%u...", 829 SHORT_UUID(entry->name.value), 830 _RQ_TYPE(rq->u_rq.request_type), 831 rq->u_rq.seq); 832 LOG_SPRINT(entry, "### No resend: [%s] %s/%u ###", 833 SHORT_UUID(entry->name.value), 834 _RQ_TYPE(rq->u_rq.request_type), 835 rq->u_rq.seq); 836 837 rq->u_rq.data_size = 0; 838 kernel_send(&rq->u_rq); 839 840 break; 841 842 default: 843 /* 844 * If an action or a response is required, then 845 * the request must be resent. 846 */ 847 LOG_COND(log_resend_requests, 848 "[%s] Resending %s(#%u) due to new server(%u)", 849 SHORT_UUID(entry->name.value), 850 _RQ_TYPE(rq->u_rq.request_type), 851 rq->u_rq.seq, entry->lowest_id); 852 LOG_SPRINT(entry, "*** Resending: [%s] %s/%u ***", 853 SHORT_UUID(entry->name.value), 854 _RQ_TYPE(rq->u_rq.request_type), 855 rq->u_rq.seq); 856 r = cluster_send(rq); 857 if (r < 0) 858 LOG_ERROR("Failed resend"); 859 } 860 free(rq); 861 } 862 863 return r; 864 } 865 866 static int do_cluster_work(void *data) 867 { 868 int r = SA_AIS_OK; 869 struct clog_cpg *entry; 870 871 dm_list_iterate_items(entry, &clog_cpg_list) { 872 r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL); 873 if (r != SA_AIS_OK) 874 LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r)); 875 876 if (entry->free_me) { 877 free(entry); 878 continue; 879 } 880 do_checkpoints(entry, 0); 881 882 resend_requests(entry); 883 } 884 885 return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */ 886 } 887 888 static int flush_startup_list(struct clog_cpg *entry) 889 { 890 int r = 0; 891 int i_was_server; 892 struct clog_request *rq, *n; 893 struct checkpoint_data *new; 894 895 dm_list_iterate_items_safe(rq, n, &entry->startup_list) { 896 dm_list_del(&rq->list); 897 898 if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) { 899 new = prepare_checkpoint(entry, rq->originator); 900 if (!new) { 901 /* 902 * FIXME: Need better error handling. Other nodes 903 * will be trying to send the checkpoint too, and we 904 * must continue processing the list; so report error 905 * but continue. 906 */ 907 LOG_ERROR("Failed to prepare checkpoint for %u!!!", 908 rq->originator); 909 free(rq); 910 continue; 911 } 912 LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u", 913 SHORT_UUID(entry->name.value), rq->originator); 914 LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u", 915 SHORT_UUID(entry->name.value), rq->originator); 916 new->next = entry->checkpoint_list; 917 entry->checkpoint_list = new; 918 } else { 919 LOG_DBG("[%s] Processing delayed request: %s", 920 SHORT_UUID(rq->u_rq.uuid), 921 _RQ_TYPE(rq->u_rq.request_type)); 922 i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0; 923 r = handle_cluster_request(entry, rq, i_was_server); 924 925 if (r) 926 /* 927 * FIXME: If we error out here, we will never get 928 * another opportunity to retry these requests 929 */ 930 LOG_ERROR("Error while processing delayed CPG message"); 931 } 932 free(rq); 933 } 934 935 return 0; 936 } 937 938 static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname, 939 uint32_t nodeid, uint32_t pid, 940 void *msg, size_t msg_len) 941 { 942 int i; 943 int r = 0; 944 int i_am_server; 945 int response = 0; 946 struct clog_request *rq = msg; 947 struct clog_request *tmp_rq; 948 struct clog_cpg *match; 949 950 match = find_clog_cpg(handle); 951 if (!match) { 952 LOG_ERROR("Unable to find clog_cpg for cluster message"); 953 return; 954 } 955 956 if ((nodeid == my_cluster_id) && 957 !(rq->u_rq.request_type & DM_ULOG_RESPONSE) && 958 (rq->u_rq.request_type != DM_ULOG_RESUME) && 959 (rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) && 960 (rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) { 961 tmp_rq = malloc(DM_ULOG_REQUEST_SIZE); 962 if (!tmp_rq) { 963 /* 964 * FIXME: It may be possible to continue... but we 965 * would not be able to resend any messages that might 966 * be necessary during membership changes 967 */ 968 LOG_ERROR("[%s] Unable to record request: -ENOMEM", 969 SHORT_UUID(rq->u_rq.uuid)); 970 return; 971 } 972 memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size); 973 dm_list_init(&tmp_rq->list); 974 dm_list_add( &match->working_list, &tmp_rq->list); 975 } 976 977 if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) { 978 /* 979 * If the server (lowest_id) indicates it is leaving, 980 * then we must resend any outstanding requests. However, 981 * we do not want to resend them if the next server in 982 * line is in the process of leaving. 983 */ 984 if (nodeid == my_cluster_id) { 985 LOG_COND(log_resend_requests, "[%s] I am leaving.1.....", 986 SHORT_UUID(rq->u_rq.uuid)); 987 } else { 988 if (nodeid < my_cluster_id) { 989 if (nodeid == match->lowest_id) { 990 match->resend_requests = 1; 991 LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s", 992 SHORT_UUID(rq->u_rq.uuid), nodeid, 993 (dm_list_empty(&match->working_list)) ? " -- working_list empty": ""); 994 995 dm_list_iterate_items(tmp_rq, &match->working_list) 996 LOG_COND(log_resend_requests, 997 "[%s] %s/%u", 998 SHORT_UUID(tmp_rq->u_rq.uuid), 999 _RQ_TYPE(tmp_rq->u_rq.request_type), 1000 tmp_rq->u_rq.seq); 1001 } 1002 1003 match->delay++; 1004 LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d", 1005 SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay); 1006 } 1007 rq->originator = nodeid; /* don't really need this, but nice for debug */ 1008 goto out; 1009 } 1010 } 1011 1012 /* 1013 * We can receive messages after we do a cpg_leave but before we 1014 * get our config callback. However, since we can't respond after 1015 * leaving, we simply return. 1016 */ 1017 if (match->state == LEAVING) 1018 return; 1019 1020 i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0; 1021 1022 if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) { 1023 if (my_cluster_id == rq->originator) { 1024 /* Redundant checkpoints ignored if match->valid */ 1025 LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u", 1026 SHORT_UUID(rq->u_rq.uuid), nodeid); 1027 if (import_checkpoint(match, (match->state != INVALID))) { 1028 LOG_SPRINT(match, 1029 "[%s] Failed to import checkpoint from %u", 1030 SHORT_UUID(rq->u_rq.uuid), nodeid); 1031 LOG_ERROR("[%s] Failed to import checkpoint from %u", 1032 SHORT_UUID(rq->u_rq.uuid), nodeid); 1033 kill(getpid(), SIGUSR1); 1034 /* Could we retry? */ 1035 goto out; 1036 } else if (match->state == INVALID) { 1037 LOG_SPRINT(match, 1038 "[%s] Checkpoint data received from %u. Log is now valid", 1039 SHORT_UUID(match->name.value), nodeid); 1040 LOG_COND(log_checkpoint, 1041 "[%s] Checkpoint data received from %u. Log is now valid", 1042 SHORT_UUID(match->name.value), nodeid); 1043 match->state = VALID; 1044 1045 flush_startup_list(match); 1046 } else { 1047 LOG_SPRINT(match, 1048 "[%s] Redundant checkpoint from %u ignored.", 1049 SHORT_UUID(rq->u_rq.uuid), nodeid); 1050 } 1051 } 1052 goto out; 1053 } 1054 1055 if (rq->u_rq.request_type & DM_ULOG_RESPONSE) { 1056 response = 1; 1057 r = handle_cluster_response(match, rq); 1058 } else { 1059 rq->originator = nodeid; 1060 1061 if (match->state == LEAVING) { 1062 LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving", 1063 SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), 1064 rq->originator); 1065 goto out; 1066 } 1067 1068 if (match->state == INVALID) { 1069 LOG_DBG("Log not valid yet, storing request"); 1070 tmp_rq = malloc(DM_ULOG_REQUEST_SIZE); 1071 if (!tmp_rq) { 1072 LOG_ERROR("cpg_message_callback: Unable to" 1073 " allocate transfer structs"); 1074 r = -ENOMEM; /* FIXME: Better error #? */ 1075 goto out; 1076 } 1077 1078 memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size); 1079 tmp_rq->pit_server = match->lowest_id; 1080 dm_list_init(&tmp_rq->list); 1081 dm_list_add(&match->startup_list, &tmp_rq->list); 1082 goto out; 1083 } 1084 1085 r = handle_cluster_request(match, rq, i_am_server); 1086 } 1087 1088 /* 1089 * If the log is now valid, we can queue the checkpoints 1090 */ 1091 for (i = match->checkpoints_needed; i; ) { 1092 struct checkpoint_data *new; 1093 1094 if (log_get_state(&rq->u_rq) != LOG_RESUMED) { 1095 LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)", 1096 SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid); 1097 break; 1098 } 1099 1100 i--; 1101 new = prepare_checkpoint(match, match->checkpoint_requesters[i]); 1102 if (!new) { 1103 /* FIXME: Need better error handling */ 1104 LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!", 1105 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]); 1106 break; 1107 } 1108 LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)", 1109 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i], 1110 (log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED"); 1111 LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*", 1112 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]); 1113 match->checkpoints_needed--; 1114 1115 new->next = match->checkpoint_list; 1116 match->checkpoint_list = new; 1117 } 1118 1119 out: 1120 /* nothing happens after this point. It is just for debugging */ 1121 if (r) { 1122 LOG_ERROR("[%s] Error while processing CPG message, %s: %s", 1123 SHORT_UUID(rq->u_rq.uuid), 1124 _RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE), 1125 strerror(-r)); 1126 LOG_ERROR("[%s] Response : %s", SHORT_UUID(rq->u_rq.uuid), 1127 (response) ? "YES" : "NO"); 1128 LOG_ERROR("[%s] Originator: %u", 1129 SHORT_UUID(rq->u_rq.uuid), rq->originator); 1130 if (response) 1131 LOG_ERROR("[%s] Responder : %u", 1132 SHORT_UUID(rq->u_rq.uuid), nodeid); 1133 1134 LOG_ERROR("HISTORY::"); 1135 for (i = 0; i < DEBUGGING_HISTORY; i++) { 1136 match->idx++; 1137 match->idx = match->idx % DEBUGGING_HISTORY; 1138 if (match->debugging[match->idx][0] == '\0') 1139 continue; 1140 LOG_ERROR("%d:%d) %s", i, match->idx, 1141 match->debugging[match->idx]); 1142 } 1143 } else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) || 1144 (rq->originator == my_cluster_id)) { 1145 if (!response) 1146 LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s", 1147 rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid), 1148 _RQ_TYPE(rq->u_rq.request_type), 1149 rq->originator, (response) ? "YES" : "NO"); 1150 else 1151 LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u", 1152 rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid), 1153 _RQ_TYPE(rq->u_rq.request_type), 1154 rq->originator, (response) ? "YES" : "NO", 1155 nodeid); 1156 } 1157 } 1158 1159 static void cpg_join_callback(struct clog_cpg *match, 1160 const struct cpg_address *joined, 1161 const struct cpg_address *member_list, 1162 size_t member_list_entries) 1163 { 1164 int i; 1165 int my_pid = getpid(); 1166 uint32_t lowest = match->lowest_id; 1167 struct clog_request *rq; 1168 char dbuf[32]; 1169 1170 /* Assign my_cluster_id */ 1171 if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid)) 1172 my_cluster_id = joined->nodeid; 1173 1174 /* Am I the very first to join? */ 1175 if (member_list_entries == 1) { 1176 match->lowest_id = joined->nodeid; 1177 match->state = VALID; 1178 } 1179 1180 /* If I am part of the joining list, I do not send checkpoints */ 1181 if (joined->nodeid == my_cluster_id) 1182 goto out; 1183 1184 memset(dbuf, 0, sizeof(dbuf)); 1185 for (i = 0; i < (member_list_entries-1); i++) 1186 sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid); 1187 sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid); 1188 LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]", 1189 SHORT_UUID(match->name.value), joined->nodeid, dbuf); 1190 1191 /* 1192 * FIXME: remove checkpoint_requesters/checkpoints_needed, and use 1193 * the startup_list interface exclusively 1194 */ 1195 if (dm_list_empty(&match->startup_list) && (match->state == VALID) && 1196 (match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) { 1197 match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid; 1198 goto out; 1199 } 1200 1201 rq = malloc(DM_ULOG_REQUEST_SIZE); 1202 if (!rq) { 1203 LOG_ERROR("cpg_config_callback: " 1204 "Unable to allocate transfer structs"); 1205 LOG_ERROR("cpg_config_callback: " 1206 "Unable to perform checkpoint"); 1207 goto out; 1208 } 1209 rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN; 1210 rq->originator = joined->nodeid; 1211 dm_list_init(&rq->list); 1212 dm_list_add(&match->startup_list, &rq->list); 1213 1214 out: 1215 /* Find the lowest_id, i.e. the server */ 1216 match->lowest_id = member_list[0].nodeid; 1217 for (i = 0; i < member_list_entries; i++) 1218 if (match->lowest_id > member_list[i].nodeid) 1219 match->lowest_id = member_list[i].nodeid; 1220 1221 if (lowest == 0xDEAD) 1222 LOG_COND(log_membership_change, "[%s] Server change <none> -> %u (%u %s)", 1223 SHORT_UUID(match->name.value), match->lowest_id, 1224 joined->nodeid, (member_list_entries == 1) ? 1225 "is first to join" : "joined"); 1226 else if (lowest != match->lowest_id) 1227 LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u joined)", 1228 SHORT_UUID(match->name.value), lowest, 1229 match->lowest_id, joined->nodeid); 1230 else 1231 LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u joined)", 1232 SHORT_UUID(match->name.value), 1233 lowest, joined->nodeid); 1234 LOG_SPRINT(match, "+++ UUID=%s %u join +++", 1235 SHORT_UUID(match->name.value), joined->nodeid); 1236 } 1237 1238 static void cpg_leave_callback(struct clog_cpg *match, 1239 const struct cpg_address *left, 1240 const struct cpg_address *member_list, 1241 size_t member_list_entries) 1242 { 1243 int i, j, fd; 1244 uint32_t lowest = match->lowest_id; 1245 struct clog_request *rq, *n; 1246 struct checkpoint_data *p_cp, *c_cp; 1247 1248 LOG_SPRINT(match, "--- UUID=%s %u left ---", 1249 SHORT_UUID(match->name.value), left->nodeid); 1250 1251 /* Am I leaving? */ 1252 if (my_cluster_id == left->nodeid) { 1253 LOG_DBG("Finalizing leave..."); 1254 dm_list_del(&match->list); 1255 1256 cpg_fd_get(match->handle, &fd); 1257 links_unregister(fd); 1258 1259 cluster_postsuspend(match->name.value, match->luid); 1260 1261 dm_list_iterate_items_safe(rq, n, &match->working_list) { 1262 dm_list_del(&rq->list); 1263 1264 if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) 1265 kernel_send(&rq->u_rq); 1266 free(rq); 1267 } 1268 1269 cpg_finalize(match->handle); 1270 1271 match->free_me = 1; 1272 match->lowest_id = 0xDEAD; 1273 match->state = INVALID; 1274 } 1275 1276 /* Remove any pending checkpoints for the leaving node. */ 1277 for (p_cp = NULL, c_cp = match->checkpoint_list; 1278 c_cp && (c_cp->requester != left->nodeid); 1279 p_cp = c_cp, c_cp = c_cp->next); 1280 if (c_cp) { 1281 if (p_cp) 1282 p_cp->next = c_cp->next; 1283 else 1284 match->checkpoint_list = c_cp->next; 1285 1286 LOG_COND(log_checkpoint, 1287 "[%s] Removing pending checkpoint (%u is leaving)", 1288 SHORT_UUID(match->name.value), left->nodeid); 1289 free_checkpoint(c_cp); 1290 } 1291 dm_list_iterate_items_safe(rq, n, &match->startup_list) { 1292 if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) && 1293 (rq->originator == left->nodeid)) { 1294 LOG_COND(log_checkpoint, 1295 "[%s] Removing pending ckpt from startup list (%u is leaving)", 1296 SHORT_UUID(match->name.value), left->nodeid); 1297 dm_list_del(&rq->list); 1298 free(rq); 1299 } 1300 } 1301 for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) { 1302 match->checkpoint_requesters[j] = match->checkpoint_requesters[i]; 1303 if (match->checkpoint_requesters[i] == left->nodeid) { 1304 LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)", 1305 SHORT_UUID(match->name.value), left->nodeid); 1306 j--; 1307 } 1308 } 1309 match->checkpoints_needed = j; 1310 1311 if (left->nodeid < my_cluster_id) { 1312 match->delay = (match->delay > 0) ? match->delay - 1 : 0; 1313 if (!match->delay && dm_list_empty(&match->working_list)) 1314 match->resend_requests = 0; 1315 LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s", 1316 SHORT_UUID(match->name.value), left->nodeid, 1317 match->delay, (dm_list_empty(&match->working_list)) ? 1318 " -- working_list empty": ""); 1319 } 1320 1321 /* Find the lowest_id, i.e. the server */ 1322 if (!member_list_entries) { 1323 match->lowest_id = 0xDEAD; 1324 LOG_COND(log_membership_change, "[%s] Server change %u -> <none> " 1325 "(%u is last to leave)", 1326 SHORT_UUID(match->name.value), left->nodeid, 1327 left->nodeid); 1328 return; 1329 } 1330 1331 match->lowest_id = member_list[0].nodeid; 1332 for (i = 0; i < member_list_entries; i++) 1333 if (match->lowest_id > member_list[i].nodeid) 1334 match->lowest_id = member_list[i].nodeid; 1335 1336 if (lowest != match->lowest_id) { 1337 LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u left)", 1338 SHORT_UUID(match->name.value), lowest, 1339 match->lowest_id, left->nodeid); 1340 } else 1341 LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u left)", 1342 SHORT_UUID(match->name.value), lowest, left->nodeid); 1343 1344 if ((match->state == INVALID) && !match->free_me) { 1345 /* 1346 * If all CPG members are waiting for checkpoints and they 1347 * are all present in my startup_list, then I was the first to 1348 * join and I must assume control. 1349 * 1350 * We do not normally end up here, but if there was a quick 1351 * 'resume -> suspend -> resume' across the cluster, we may 1352 * have initially thought we were not the first to join because 1353 * of the presence of out-going (and unable to respond) members. 1354 */ 1355 1356 i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */ 1357 dm_list_iterate_items(rq, &match->startup_list) 1358 if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) 1359 i++; 1360 1361 if (i == member_list_entries) { 1362 /* 1363 * Last node who could have given me a checkpoint just left. 1364 * Setting log state to VALID and acting as 'first join'. 1365 */ 1366 match->state = VALID; 1367 flush_startup_list(match); 1368 } 1369 } 1370 } 1371 1372 static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gname, 1373 const struct cpg_address *member_list, 1374 size_t member_list_entries, 1375 const struct cpg_address *left_list, 1376 size_t left_list_entries, 1377 const struct cpg_address *joined_list, 1378 size_t joined_list_entries) 1379 { 1380 struct clog_cpg *match; 1381 int found = 0; 1382 1383 dm_list_iterate_items(match, &clog_cpg_list) 1384 if (match->handle == handle) { 1385 found = 1; 1386 break; 1387 } 1388 1389 if (!found) { 1390 LOG_ERROR("Unable to find match for CPG config callback"); 1391 return; 1392 } 1393 1394 if ((joined_list_entries + left_list_entries) > 1) 1395 LOG_ERROR("[%s] More than one node joining/leaving", 1396 SHORT_UUID(match->name.value)); 1397 1398 if (joined_list_entries) 1399 cpg_join_callback(match, joined_list, 1400 member_list, member_list_entries); 1401 else 1402 cpg_leave_callback(match, left_list, 1403 member_list, member_list_entries); 1404 } 1405 1406 cpg_callbacks_t cpg_callbacks = { 1407 .cpg_deliver_fn = cpg_message_callback, 1408 .cpg_confchg_fn = cpg_config_callback, 1409 }; 1410 1411 /* 1412 * remove_checkpoint 1413 * @entry 1414 * 1415 * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error 1416 */ 1417 int remove_checkpoint(struct clog_cpg *entry) 1418 { 1419 int len; 1420 SaNameT name; 1421 SaAisErrorT rv; 1422 SaCkptCheckpointHandleT h; 1423 1424 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u", 1425 SHORT_UUID(entry->name.value), my_cluster_id); 1426 name.length = len; 1427 1428 open_retry: 1429 rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL, 1430 SA_CKPT_CHECKPOINT_READ, 0, &h); 1431 if (rv == SA_AIS_ERR_TRY_AGAIN) { 1432 LOG_ERROR("abort_startup: ckpt open retry"); 1433 usleep(1000); 1434 goto open_retry; 1435 } 1436 1437 if (rv != SA_AIS_OK) 1438 return 0; 1439 1440 LOG_DBG("[%s] Removing checkpoint", SHORT_UUID(entry->name.value)); 1441 unlink_retry: 1442 rv = saCkptCheckpointUnlink(ckpt_handle, &name); 1443 if (rv == SA_AIS_ERR_TRY_AGAIN) { 1444 LOG_ERROR("abort_startup: ckpt unlink retry"); 1445 usleep(1000); 1446 goto unlink_retry; 1447 } 1448 1449 if (rv != SA_AIS_OK) { 1450 LOG_ERROR("[%s] Failed to unlink checkpoint: %s", 1451 SHORT_UUID(entry->name.value), str_ais_error(rv)); 1452 return -EIO; 1453 } 1454 1455 saCkptCheckpointClose(h); 1456 1457 return 1; 1458 } 1459 1460 int create_cluster_cpg(char *uuid, uint64_t luid) 1461 { 1462 int r; 1463 int size; 1464 struct clog_cpg *new = NULL; 1465 struct clog_cpg *tmp; 1466 1467 dm_list_iterate_items(tmp, &clog_cpg_list) 1468 if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) { 1469 LOG_ERROR("Log entry already exists: %s", uuid); 1470 return -EEXIST; 1471 } 1472 1473 new = malloc(sizeof(*new)); 1474 if (!new) { 1475 LOG_ERROR("Unable to allocate memory for clog_cpg"); 1476 return -ENOMEM; 1477 } 1478 memset(new, 0, sizeof(*new)); 1479 dm_list_init(&new->list); 1480 new->lowest_id = 0xDEAD; 1481 dm_list_init(&new->startup_list); 1482 dm_list_init(&new->working_list); 1483 1484 size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ? 1485 CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1); 1486 strncpy(new->name.value, uuid, size); 1487 new->name.length = size; 1488 new->luid = luid; 1489 1490 /* 1491 * Ensure there are no stale checkpoints around before we join 1492 */ 1493 if (remove_checkpoint(new) == 1) 1494 LOG_COND(log_checkpoint, 1495 "[%s] Removing checkpoints left from previous session", 1496 SHORT_UUID(new->name.value)); 1497 1498 r = cpg_initialize(&new->handle, &cpg_callbacks); 1499 if (r != SA_AIS_OK) { 1500 LOG_ERROR("cpg_initialize failed: Cannot join cluster"); 1501 free(new); 1502 return -EPERM; 1503 } 1504 1505 r = cpg_join(new->handle, &new->name); 1506 if (r != SA_AIS_OK) { 1507 LOG_ERROR("cpg_join failed: Cannot join cluster"); 1508 free(new); 1509 return -EPERM; 1510 } 1511 1512 new->cpg_state = VALID; 1513 dm_list_add(&clog_cpg_list, &new->list); 1514 LOG_DBG("New handle: %llu", (unsigned long long)new->handle); 1515 LOG_DBG("New name: %s", new->name.value); 1516 1517 /* FIXME: better variable */ 1518 cpg_fd_get(new->handle, &r); 1519 links_register(r, "cluster", do_cluster_work, NULL); 1520 1521 return 0; 1522 } 1523 1524 static void abort_startup(struct clog_cpg *del) 1525 { 1526 struct clog_request *rq, *n; 1527 1528 LOG_DBG("[%s] CPG teardown before checkpoint received", 1529 SHORT_UUID(del->name.value)); 1530 1531 dm_list_iterate_items_safe(rq, n, &del->startup_list) { 1532 dm_list_del(&rq->list); 1533 1534 LOG_DBG("[%s] Ignoring request from %u: %s", 1535 SHORT_UUID(del->name.value), rq->originator, 1536 _RQ_TYPE(rq->u_rq.request_type)); 1537 free(rq); 1538 } 1539 1540 remove_checkpoint(del); 1541 } 1542 1543 static int _destroy_cluster_cpg(struct clog_cpg *del) 1544 { 1545 int r; 1546 int state; 1547 1548 LOG_COND(log_resend_requests, "[%s] I am leaving.2.....", 1549 SHORT_UUID(del->name.value)); 1550 1551 /* 1552 * We must send any left over checkpoints before 1553 * leaving. If we don't, an incoming node could 1554 * be stuck with no checkpoint and stall. 1555 do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS: 1556 1557 - Incoming node deletes old checkpoints before joining 1558 - A stale checkpoint is issued here by leaving node 1559 - (leaving node leaves) 1560 - Incoming node joins cluster and finds stale checkpoint. 1561 - (leaving node leaves - option 2) 1562 */ 1563 do_checkpoints(del, 1); 1564 1565 state = del->state; 1566 1567 del->cpg_state = INVALID; 1568 del->state = LEAVING; 1569 1570 /* 1571 * If the state is VALID, we might be processing the 1572 * startup list. If so, we certainly don't want to 1573 * clear the startup_list here by calling abort_startup 1574 */ 1575 if (!dm_list_empty(&del->startup_list) && (state != VALID)) 1576 abort_startup(del); 1577 1578 r = cpg_leave(del->handle, &del->name); 1579 if (r != CPG_OK) 1580 LOG_ERROR("Error leaving CPG!"); 1581 return 0; 1582 } 1583 1584 int destroy_cluster_cpg(char *uuid) 1585 { 1586 struct clog_cpg *del, *tmp; 1587 1588 dm_list_iterate_items_safe(del, tmp, &clog_cpg_list) 1589 if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH)) 1590 _destroy_cluster_cpg(del); 1591 1592 return 0; 1593 } 1594 1595 int init_cluster(void) 1596 { 1597 SaAisErrorT rv; 1598 1599 dm_list_init(&clog_cpg_list); 1600 rv = saCkptInitialize(&ckpt_handle, &callbacks, &version); 1601 1602 if (rv != SA_AIS_OK) 1603 return EXIT_CLUSTER_CKPT_INIT; 1604 1605 return 0; 1606 } 1607 1608 void cleanup_cluster(void) 1609 { 1610 SaAisErrorT err; 1611 1612 err = saCkptFinalize(ckpt_handle); 1613 if (err != SA_AIS_OK) 1614 LOG_ERROR("Failed to finalize checkpoint handle"); 1615 } 1616 1617 void cluster_debug(void) 1618 { 1619 struct checkpoint_data *cp; 1620 struct clog_cpg *entry; 1621 struct clog_request *rq; 1622 int i; 1623 1624 LOG_ERROR(""); 1625 LOG_ERROR("CLUSTER COMPONENT DEBUGGING::"); 1626 dm_list_iterate_items(entry, &clog_cpg_list) { 1627 LOG_ERROR("%s::", SHORT_UUID(entry->name.value)); 1628 LOG_ERROR(" lowest_id : %u", entry->lowest_id); 1629 LOG_ERROR(" state : %s", (entry->state == INVALID) ? 1630 "INVALID" : (entry->state == VALID) ? "VALID" : 1631 (entry->state == LEAVING) ? "LEAVING" : "UNKNOWN"); 1632 LOG_ERROR(" cpg_state : %d", entry->cpg_state); 1633 LOG_ERROR(" free_me : %d", entry->free_me); 1634 LOG_ERROR(" delay : %d", entry->delay); 1635 LOG_ERROR(" resend_requests : %d", entry->resend_requests); 1636 LOG_ERROR(" checkpoints_needed: %d", entry->checkpoints_needed); 1637 for (i = 0, cp = entry->checkpoint_list; 1638 i < MAX_CHECKPOINT_REQUESTERS; i++) 1639 if (cp) 1640 cp = cp->next; 1641 else 1642 break; 1643 LOG_ERROR(" CKPTs waiting : %d", i); 1644 LOG_ERROR(" Working list:"); 1645 dm_list_iterate_items(rq, &entry->working_list) 1646 LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type), 1647 rq->u_rq.seq); 1648 1649 LOG_ERROR(" Startup list:"); 1650 dm_list_iterate_items(rq, &entry->startup_list) 1651 LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type), 1652 rq->u_rq.seq); 1653 1654 LOG_ERROR("Command History:"); 1655 for (i = 0; i < DEBUGGING_HISTORY; i++) { 1656 entry->idx++; 1657 entry->idx = entry->idx % DEBUGGING_HISTORY; 1658 if (entry->debugging[entry->idx][0] == '\0') 1659 continue; 1660 LOG_ERROR("%d:%d) %s", i, entry->idx, 1661 entry->debugging[entry->idx]); 1662 } 1663 } 1664 } 1665