1 /* 2 * HCLINK.C 3 * 4 * This module implements a simple remote control protocol 5 * 6 * $DragonFly: src/bin/cpdup/hclink.c,v 1.10 2008/05/24 17:21:36 dillon Exp $ 7 */ 8 9 #include "cpdup.h" 10 #include "hclink.h" 11 #include "hcproto.h" 12 13 #if USE_PTHREADS 14 static void * hcc_reader_thread(void *arg); 15 #endif 16 static struct HCHead *hcc_read_command(struct HostConf *hc, hctransaction_t trans); 17 static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead); 18 19 int 20 hcc_connect(struct HostConf *hc) 21 { 22 int fdin[2]; 23 int fdout[2]; 24 const char *av[16]; 25 26 if (hc == NULL || hc->host == NULL) 27 return(0); 28 29 if (pipe(fdin) < 0) 30 return(-1); 31 if (pipe(fdout) < 0) { 32 close(fdin[0]); 33 close(fdin[1]); 34 return(-1); 35 } 36 if ((hc->pid = fork()) == 0) { 37 /* 38 * Child process 39 */ 40 int n; 41 42 dup2(fdin[1], 1); 43 close(fdin[0]); 44 close(fdin[1]); 45 dup2(fdout[0], 0); 46 close(fdout[0]); 47 close(fdout[1]); 48 49 n = 0; 50 av[n++] = "ssh"; 51 if (CompressOpt) 52 av[n++] = "-C"; 53 av[n++] = "-T"; 54 av[n++] = hc->host; 55 av[n++] = "cpdup"; 56 av[n++] = "-S"; 57 av[n++] = NULL; 58 59 execv("/usr/bin/ssh", (void *)av); 60 _exit(1); 61 } else if (hc->pid < 0) { 62 return(-1); 63 } else { 64 /* 65 * Parent process. Do the initial handshake to make sure we are 66 * actually talking to a cpdup slave. 67 */ 68 close(fdin[1]); 69 hc->fdin = fdin[0]; 70 close(fdout[0]); 71 hc->fdout = fdout[1]; 72 #if USE_PTHREADS 73 pthread_create(&hc->reader_thread, NULL, hcc_reader_thread, hc); 74 #endif 75 return(0); 76 } 77 } 78 79 static int 80 rc_badop(hctransaction_t trans __unused, struct HCHead *head) 81 { 82 head->error = EOPNOTSUPP; 83 return(0); 84 } 85 86 int 87 hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count) 88 { 89 struct HostConf hcslave; 90 struct HCHead *head; 91 struct HCHead *whead; 92 struct HCTransaction trans; 93 int (*dispatch[256])(hctransaction_t, struct HCHead *); 94 int aligned_bytes; 95 int i; 96 int r; 97 98 bzero(&hcslave, sizeof(hcslave)); 99 bzero(&trans, sizeof(trans)); 100 for (i = 0; i < count; ++i) { 101 struct HCDesc *desc = &descs[i]; 102 assert(desc->cmd >= 0 && desc->cmd < 256); 103 dispatch[desc->cmd] = desc->func; 104 } 105 for (i = 0; i < 256; ++i) { 106 if (dispatch[i] == NULL) 107 dispatch[i] = rc_badop; 108 } 109 hcslave.fdin = fdin; 110 hcslave.fdout = fdout; 111 trans.hc = &hcslave; 112 113 #if USE_PTHREADS 114 pthread_mutex_unlock(&MasterMutex); 115 #endif 116 /* 117 * Process commands on fdin and write out results on fdout 118 */ 119 for (;;) { 120 /* 121 * Get the command 122 */ 123 head = hcc_read_command(trans.hc, &trans); 124 if (head == NULL) 125 break; 126 127 /* 128 * Start the reply and dispatch, then process the return code. 129 */ 130 head->error = 0; 131 hcc_start_reply(&trans, head); 132 133 r = dispatch[head->cmd & 255](&trans, head); 134 135 switch(r) { 136 case -2: 137 head->error = EINVAL; 138 break; 139 case -1: 140 head->error = errno; 141 break; 142 case 0: 143 break; 144 default: 145 assert(0); 146 break; 147 } 148 149 /* 150 * Write out the reply 151 */ 152 whead = (void *)trans.wbuf; 153 whead->bytes = trans.windex; 154 whead->error = head->error; 155 aligned_bytes = HCC_ALIGN(trans.windex); 156 #ifdef DEBUG 157 hcc_debug_dump(whead); 158 #endif 159 if (write(hcslave.fdout, whead, aligned_bytes) != aligned_bytes) 160 break; 161 } 162 return(0); 163 } 164 165 #if USE_PTHREADS 166 /* 167 * This thread collects responses from the link. It is run without 168 * the MasterMutex. 169 */ 170 static void * 171 hcc_reader_thread(void *arg) 172 { 173 struct HostConf *hc = arg; 174 struct HCHead *rhead; 175 hctransaction_t scan; 176 int i; 177 178 pthread_detach(pthread_self()); 179 while (hcc_read_command(hc, NULL) != NULL) 180 ; 181 hc->reader_thread = NULL; 182 183 /* 184 * Clean up any threads stuck waiting for a reply. 185 */ 186 pthread_mutex_lock(&MasterMutex); 187 for (i = 0; i < HCTHASH_SIZE; ++i) { 188 pthread_mutex_lock(&hc->hct_mutex[i]); 189 for (scan = hc->hct_hash[i]; scan; scan = scan->next) { 190 if (scan->state == HCT_SENT) { 191 scan->state = HCT_REPLIED; 192 rhead = (void *)scan->rbuf; 193 rhead->error = ENOTCONN; 194 if (scan->waiting) 195 pthread_cond_signal(&scan->cond); 196 } 197 } 198 pthread_mutex_unlock(&hc->hct_mutex[i]); 199 } 200 pthread_mutex_unlock(&MasterMutex); 201 return(NULL); 202 } 203 204 #endif 205 206 /* 207 * This reads a command from fdin, fixes up the byte ordering, and returns 208 * a pointer to HCHead. 209 * 210 * The MasterMutex may or may not be held. When threaded this command 211 * is serialized by a reader thread. 212 */ 213 static 214 struct HCHead * 215 hcc_read_command(struct HostConf *hc, hctransaction_t trans) 216 { 217 hctransaction_t fill; 218 struct HCHead tmp; 219 int aligned_bytes; 220 int n; 221 int r; 222 223 n = 0; 224 while (n < (int)sizeof(struct HCHead)) { 225 r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n); 226 if (r <= 0) 227 goto fail; 228 n += r; 229 } 230 231 assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536); 232 assert(tmp.magic == HCMAGIC); 233 234 if (trans) { 235 fill = trans; 236 } else { 237 #if USE_PTHREADS 238 pthread_mutex_lock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]); 239 for (fill = hc->hct_hash[tmp.id & HCTHASH_MASK]; 240 fill; 241 fill = fill->next) 242 { 243 if (fill->state == HCT_SENT && fill->id == tmp.id) 244 break; 245 } 246 pthread_mutex_unlock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]); 247 if (fill == NULL) 248 #endif 249 { 250 fprintf(stderr, 251 "cpdup hlink protocol error with %s (%04x %04x)\n", 252 hc->host, trans->id, tmp.id); 253 exit(1); 254 } 255 } 256 257 bcopy(&tmp, fill->rbuf, n); 258 aligned_bytes = HCC_ALIGN(tmp.bytes); 259 260 while (n < aligned_bytes) { 261 r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n); 262 if (r <= 0) 263 goto fail; 264 n += r; 265 } 266 #ifdef DEBUG 267 hcc_debug_dump(head); 268 #endif 269 #if USE_PTHREADS 270 pthread_mutex_lock(&hc->hct_mutex[fill->id & HCTHASH_MASK]); 271 #endif 272 fill->state = HCT_REPLIED; 273 #if USE_PTHREADS 274 if (fill->waiting) 275 pthread_cond_signal(&fill->cond); 276 pthread_mutex_unlock(&hc->hct_mutex[fill->id & HCTHASH_MASK]); 277 #endif 278 return((void *)fill->rbuf); 279 fail: 280 return(NULL); 281 } 282 283 #if USE_PTHREADS 284 285 static 286 hctransaction_t 287 hcc_get_trans(struct HostConf *hc) 288 { 289 hctransaction_t trans; 290 hctransaction_t scan; 291 pthread_t tid = pthread_self(); 292 int i; 293 294 i = ((intptr_t)tid >> 7) & HCTHASH_MASK; 295 296 pthread_mutex_lock(&hc->hct_mutex[i]); 297 for (trans = hc->hct_hash[i]; trans; trans = trans->next) { 298 if (trans->tid == tid) 299 break; 300 } 301 if (trans == NULL) { 302 trans = malloc(sizeof(*trans)); 303 bzero(trans, sizeof(*trans)); 304 trans->tid = tid; 305 trans->id = i; 306 pthread_cond_init(&trans->cond, NULL); 307 do { 308 for (scan = hc->hct_hash[i]; scan; scan = scan->next) { 309 if (scan->id == trans->id) { 310 trans->id += HCTHASH_SIZE; 311 break; 312 } 313 } 314 } while (scan != NULL); 315 316 trans->next = hc->hct_hash[i]; 317 hc->hct_hash[i] = trans; 318 } 319 pthread_mutex_unlock(&hc->hct_mutex[i]); 320 return(trans); 321 } 322 323 void 324 hcc_free_trans(struct HostConf *hc) 325 { 326 hctransaction_t trans; 327 hctransaction_t *transp; 328 pthread_t tid = pthread_self(); 329 int i; 330 331 i = ((intptr_t)tid >> 7) & HCTHASH_MASK; 332 333 pthread_mutex_lock(&hc->hct_mutex[i]); 334 for (transp = &hc->hct_hash[i]; *transp; transp = &trans->next) { 335 trans = *transp; 336 if (trans->tid == tid) { 337 *transp = trans->next; 338 pthread_cond_destroy(&trans->cond); 339 free(trans); 340 break; 341 } 342 } 343 pthread_mutex_unlock(&hc->hct_mutex[i]); 344 } 345 346 #else 347 348 static 349 hctransaction_t 350 hcc_get_trans(struct HostConf *hc) 351 { 352 return(&hc->trans); 353 } 354 355 void 356 hcc_free_trans(struct HostConf *hc __unused) 357 { 358 /* nop */ 359 } 360 361 #endif 362 363 /* 364 * Initialize for a new command 365 */ 366 hctransaction_t 367 hcc_start_command(struct HostConf *hc, int16_t cmd) 368 { 369 struct HCHead *whead; 370 hctransaction_t trans; 371 372 trans = hcc_get_trans(hc); 373 374 whead = (void *)trans->wbuf; 375 whead->magic = HCMAGIC; 376 whead->bytes = 0; 377 whead->cmd = cmd; 378 whead->id = trans->id; 379 whead->error = 0; 380 381 trans->windex = sizeof(*whead); 382 trans->hc = hc; 383 trans->state = HCT_IDLE; 384 385 return(trans); 386 } 387 388 static void 389 hcc_start_reply(hctransaction_t trans, struct HCHead *rhead) 390 { 391 struct HCHead *whead = (void *)trans->wbuf; 392 393 whead->magic = HCMAGIC; 394 whead->bytes = 0; 395 whead->cmd = rhead->cmd | HCF_REPLY; 396 whead->id = rhead->id; 397 whead->error = 0; 398 399 trans->windex = sizeof(*whead); 400 } 401 402 /* 403 * Finish constructing a command, transmit it, and await the reply. 404 * Return the HCHead of the reply. 405 */ 406 struct HCHead * 407 hcc_finish_command(hctransaction_t trans) 408 { 409 struct HostConf *hc; 410 struct HCHead *whead; 411 struct HCHead *rhead; 412 int aligned_bytes; 413 int16_t wcmd; 414 415 hc = trans->hc; 416 whead = (void *)trans->wbuf; 417 whead->bytes = trans->windex; 418 aligned_bytes = HCC_ALIGN(trans->windex); 419 420 trans->state = HCT_SENT; 421 422 if (write(hc->fdout, whead, aligned_bytes) != aligned_bytes) { 423 #ifdef __error 424 *__error = EIO; 425 #else 426 errno = EIO; 427 #endif 428 if (whead->cmd < 0x0010) 429 return(NULL); 430 fprintf(stderr, "cpdup lost connection to %s\n", hc->host); 431 exit(1); 432 } 433 434 wcmd = whead->cmd; 435 436 /* 437 * whead is invalid when we call hcc_read_command() because 438 * we may switch to another thread. 439 */ 440 #if USE_PTHREADS 441 pthread_mutex_unlock(&MasterMutex); 442 while (trans->state != HCT_REPLIED && hc->reader_thread) { 443 pthread_mutex_t *mtxp = &hc->hct_mutex[trans->id & HCTHASH_MASK]; 444 pthread_mutex_lock(mtxp); 445 trans->waiting = 1; 446 if (trans->state != HCT_REPLIED && hc->reader_thread) 447 pthread_cond_wait(&trans->cond, mtxp); 448 trans->waiting = 0; 449 pthread_mutex_unlock(mtxp); 450 } 451 pthread_mutex_lock(&MasterMutex); 452 rhead = (void *)trans->rbuf; 453 #else 454 rhead = hcc_read_command(hc, trans); 455 #endif 456 if (trans->state != HCT_REPLIED || rhead->id != trans->id) { 457 #ifdef __error 458 *__error = EIO; 459 #else 460 errno = EIO; 461 #endif 462 if (wcmd < 0x0010) 463 return(NULL); 464 fprintf(stderr, "cpdup lost connection to %s\n", hc->host); 465 exit(1); 466 } 467 trans->state = HCT_DONE; 468 469 if (rhead->error) { 470 #ifdef __error 471 *__error = rhead->error; 472 #else 473 errno = rhead->error; 474 #endif 475 } 476 return (rhead); 477 } 478 479 void 480 hcc_leaf_string(hctransaction_t trans, int16_t leafid, const char *str) 481 { 482 struct HCLeaf *item; 483 int bytes = strlen(str) + 1; 484 485 item = (void *)(trans->wbuf + trans->windex); 486 assert(trans->windex + sizeof(*item) + bytes < 65536); 487 item->leafid = leafid; 488 item->reserved = 0; 489 item->bytes = sizeof(*item) + bytes; 490 bcopy(str, item + 1, bytes); 491 trans->windex = HCC_ALIGN(trans->windex + item->bytes); 492 } 493 494 void 495 hcc_leaf_data(hctransaction_t trans, int16_t leafid, const void *ptr, int bytes) 496 { 497 struct HCLeaf *item; 498 499 item = (void *)(trans->wbuf + trans->windex); 500 assert(trans->windex + sizeof(*item) + bytes < 65536); 501 item->leafid = leafid; 502 item->reserved = 0; 503 item->bytes = sizeof(*item) + bytes; 504 bcopy(ptr, item + 1, bytes); 505 trans->windex = HCC_ALIGN(trans->windex + item->bytes); 506 } 507 508 void 509 hcc_leaf_int32(hctransaction_t trans, int16_t leafid, int32_t value) 510 { 511 struct HCLeaf *item; 512 513 item = (void *)(trans->wbuf + trans->windex); 514 assert(trans->windex + sizeof(*item) + sizeof(value) < 65536); 515 item->leafid = leafid; 516 item->reserved = 0; 517 item->bytes = sizeof(*item) + sizeof(value); 518 *(int32_t *)(item + 1) = value; 519 trans->windex = HCC_ALIGN(trans->windex + item->bytes); 520 } 521 522 void 523 hcc_leaf_int64(hctransaction_t trans, int16_t leafid, int64_t value) 524 { 525 struct HCLeaf *item; 526 527 item = (void *)(trans->wbuf + trans->windex); 528 assert(trans->windex + sizeof(*item) + sizeof(value) < 65536); 529 item->leafid = leafid; 530 item->reserved = 0; 531 item->bytes = sizeof(*item) + sizeof(value); 532 *(int64_t *)(item + 1) = value; 533 trans->windex = HCC_ALIGN(trans->windex + item->bytes); 534 } 535 536 int 537 hcc_alloc_descriptor(struct HostConf *hc, void *ptr, int type) 538 { 539 struct HCHostDesc *hd; 540 struct HCHostDesc *hnew; 541 542 hnew = malloc(sizeof(struct HCHostDesc)); 543 hnew->type = type; 544 hnew->data = ptr; 545 546 if ((hd = hc->hostdescs) != NULL) { 547 hnew->desc = hd->desc + 1; 548 } else { 549 hnew->desc = 1; 550 } 551 hnew->next = hd; 552 hc->hostdescs = hnew; 553 return(hnew->desc); 554 } 555 556 void * 557 hcc_get_descriptor(struct HostConf *hc, int desc, int type) 558 { 559 struct HCHostDesc *hd; 560 561 for (hd = hc->hostdescs; hd; hd = hd->next) { 562 if (hd->desc == desc && hd->type == type) 563 return(hd->data); 564 } 565 return(NULL); 566 } 567 568 void 569 hcc_set_descriptor(struct HostConf *hc, int desc, void *ptr, int type) 570 { 571 struct HCHostDesc *hd; 572 struct HCHostDesc **hdp; 573 574 for (hdp = &hc->hostdescs; (hd = *hdp) != NULL; hdp = &hd->next) { 575 if (hd->desc == desc) { 576 if (ptr) { 577 hd->data = ptr; 578 hd->type = type; 579 } else { 580 *hdp = hd->next; 581 free(hd); 582 } 583 return; 584 } 585 } 586 if (ptr) { 587 hd = malloc(sizeof(*hd)); 588 hd->desc = desc; 589 hd->type = type; 590 hd->data = ptr; 591 hd->next = hc->hostdescs; 592 hc->hostdescs = hd; 593 } 594 } 595 596 struct HCLeaf * 597 hcc_firstitem(struct HCHead *head) 598 { 599 struct HCLeaf *item; 600 int offset; 601 602 offset = sizeof(*head); 603 if (offset == head->bytes) 604 return(NULL); 605 assert(head->bytes >= offset + (int)sizeof(*item)); 606 item = (void *)(head + 1); 607 assert(head->bytes >= offset + item->bytes); 608 assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset); 609 return (item); 610 } 611 612 struct HCLeaf * 613 hcc_nextitem(struct HCHead *head, struct HCLeaf *item) 614 { 615 int offset; 616 617 item = (void *)((char *)item + HCC_ALIGN(item->bytes)); 618 offset = (char *)item - (char *)head; 619 if (offset == head->bytes) 620 return(NULL); 621 assert(head->bytes >= offset + (int)sizeof(*item)); 622 assert(head->bytes >= offset + item->bytes); 623 assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset); 624 return (item); 625 } 626 627 #ifdef DEBUG 628 629 void 630 hcc_debug_dump(struct HCHead *head) 631 { 632 struct HCLeaf *item; 633 int aligned_bytes = HCC_ALIGN(head->bytes); 634 635 fprintf(stderr, "DUMP %04x (%d)", (u_int16_t)head->cmd, aligned_bytes); 636 if (head->cmd & HCF_REPLY) 637 fprintf(stderr, " error %d", head->error); 638 fprintf(stderr, "\n"); 639 for (item = hcc_firstitem(head); item; item = hcc_nextitem(head, item)) { 640 fprintf(stderr, " ITEM %04x DATA ", item->leafid); 641 switch(item->leafid & LCF_TYPEMASK) { 642 case LCF_INT32: 643 fprintf(stderr, "int32 %d\n", *(int32_t *)(item + 1)); 644 break; 645 case LCF_INT64: 646 fprintf(stderr, "int64 %lld\n", *(int64_t *)(item + 1)); 647 break; 648 case LCF_STRING: 649 fprintf(stderr, "\"%s\"\n", (char *)(item + 1)); 650 break; 651 case LCF_BINARY: 652 fprintf(stderr, "(binary)\n"); 653 break; 654 default: 655 printf("?\n"); 656 } 657 } 658 } 659 660 #endif 661