xref: /dragonfly/bin/cpdup/hclink.c (revision 3641b7ca)
1 /*
2  * HCLINK.C
3  *
4  * This module implements a simple remote control protocol
5  *
6  * $DragonFly: src/bin/cpdup/hclink.c,v 1.10 2008/05/24 17:21:36 dillon Exp $
7  */
8 
9 #include "cpdup.h"
10 #include "hclink.h"
11 #include "hcproto.h"
12 
13 #if USE_PTHREADS
14 static void * hcc_reader_thread(void *arg);
15 #endif
16 static struct HCHead *hcc_read_command(struct HostConf *hc, hctransaction_t trans);
17 static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead);
18 
19 int
20 hcc_connect(struct HostConf *hc)
21 {
22     int fdin[2];
23     int fdout[2];
24     const char *av[16];
25 
26     if (hc == NULL || hc->host == NULL)
27 	return(0);
28 
29     if (pipe(fdin) < 0)
30 	return(-1);
31     if (pipe(fdout) < 0) {
32 	close(fdin[0]);
33 	close(fdin[1]);
34 	return(-1);
35     }
36     if ((hc->pid = fork()) == 0) {
37 	/*
38 	 * Child process
39 	 */
40 	int n;
41 
42 	dup2(fdin[1], 1);
43 	close(fdin[0]);
44 	close(fdin[1]);
45 	dup2(fdout[0], 0);
46 	close(fdout[0]);
47 	close(fdout[1]);
48 
49 	n = 0;
50 	av[n++] = "ssh";
51 	if (CompressOpt)
52 	    av[n++] = "-C";
53 	av[n++] = "-T";
54 	av[n++] = hc->host;
55 	av[n++] = "cpdup";
56 	av[n++] = "-S";
57 	av[n++] = NULL;
58 
59 	execv("/usr/bin/ssh", (void *)av);
60 	_exit(1);
61     } else if (hc->pid < 0) {
62 	return(-1);
63     } else {
64 	/*
65 	 * Parent process.  Do the initial handshake to make sure we are
66 	 * actually talking to a cpdup slave.
67 	 */
68 	close(fdin[1]);
69 	hc->fdin = fdin[0];
70 	close(fdout[0]);
71 	hc->fdout = fdout[1];
72 #if USE_PTHREADS
73 	pthread_create(&hc->reader_thread, NULL, hcc_reader_thread, hc);
74 #endif
75 	return(0);
76     }
77 }
78 
79 static int
80 rc_badop(hctransaction_t trans __unused, struct HCHead *head)
81 {
82     head->error = EOPNOTSUPP;
83     return(0);
84 }
85 
86 int
87 hcc_slave(int fdin, int fdout, struct HCDesc *descs, int count)
88 {
89     struct HostConf hcslave;
90     struct HCHead *head;
91     struct HCHead *whead;
92     struct HCTransaction trans;
93     int (*dispatch[256])(hctransaction_t, struct HCHead *);
94     int aligned_bytes;
95     int i;
96     int r;
97 
98     bzero(&hcslave, sizeof(hcslave));
99     bzero(&trans, sizeof(trans));
100     for (i = 0; i < count; ++i) {
101 	struct HCDesc *desc = &descs[i];
102 	assert(desc->cmd >= 0 && desc->cmd < 256);
103 	dispatch[desc->cmd] = desc->func;
104     }
105     for (i = 0; i < 256; ++i) {
106 	if (dispatch[i] == NULL)
107 	    dispatch[i] = rc_badop;
108     }
109     hcslave.fdin = fdin;
110     hcslave.fdout = fdout;
111     trans.hc = &hcslave;
112 
113 #if USE_PTHREADS
114     pthread_mutex_unlock(&MasterMutex);
115 #endif
116     /*
117      * Process commands on fdin and write out results on fdout
118      */
119     for (;;) {
120 	/*
121 	 * Get the command
122 	 */
123 	head = hcc_read_command(trans.hc, &trans);
124 	if (head == NULL)
125 	    break;
126 
127 	/*
128 	 * Start the reply and dispatch, then process the return code.
129 	 */
130 	head->error = 0;
131 	hcc_start_reply(&trans, head);
132 
133 	r = dispatch[head->cmd & 255](&trans, head);
134 
135 	switch(r) {
136 	case -2:
137 		head->error = EINVAL;
138 		break;
139 	case -1:
140 		head->error = errno;
141 		break;
142 	case 0:
143 		break;
144 	default:
145 		assert(0);
146 		break;
147 	}
148 
149 	/*
150 	 * Write out the reply
151 	 */
152 	whead = (void *)trans.wbuf;
153 	whead->bytes = trans.windex;
154 	whead->error = head->error;
155 	aligned_bytes = HCC_ALIGN(trans.windex);
156 #ifdef DEBUG
157 	hcc_debug_dump(whead);
158 #endif
159 	if (write(hcslave.fdout, whead, aligned_bytes) != aligned_bytes)
160 	    break;
161     }
162     return(0);
163 }
164 
165 #if USE_PTHREADS
166 /*
167  * This thread collects responses from the link.  It is run without
168  * the MasterMutex.
169  */
170 static void *
171 hcc_reader_thread(void *arg)
172 {
173     struct HostConf *hc = arg;
174     struct HCHead *rhead;
175     hctransaction_t scan;
176     int i;
177 
178     pthread_detach(pthread_self());
179     while (hcc_read_command(hc, NULL) != NULL)
180 	;
181     hc->reader_thread = NULL;
182 
183     /*
184      * Clean up any threads stuck waiting for a reply.
185      */
186     pthread_mutex_lock(&MasterMutex);
187     for (i = 0; i < HCTHASH_SIZE; ++i) {
188 	pthread_mutex_lock(&hc->hct_mutex[i]);
189 	for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
190 	    if (scan->state == HCT_SENT) {
191 		scan->state = HCT_REPLIED;
192 		rhead = (void *)scan->rbuf;
193 		rhead->error = ENOTCONN;
194 		if (scan->waiting)
195 		    pthread_cond_signal(&scan->cond);
196 	    }
197 	}
198 	pthread_mutex_unlock(&hc->hct_mutex[i]);
199     }
200     pthread_mutex_unlock(&MasterMutex);
201     return(NULL);
202 }
203 
204 #endif
205 
206 /*
207  * This reads a command from fdin, fixes up the byte ordering, and returns
208  * a pointer to HCHead.
209  *
210  * The MasterMutex may or may not be held.  When threaded this command
211  * is serialized by a reader thread.
212  */
213 static
214 struct HCHead *
215 hcc_read_command(struct HostConf *hc, hctransaction_t trans)
216 {
217     hctransaction_t fill;
218     struct HCHead tmp;
219     int aligned_bytes;
220     int n;
221     int r;
222 
223     n = 0;
224     while (n < (int)sizeof(struct HCHead)) {
225 	r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
226 	if (r <= 0)
227 	    goto fail;
228 	n += r;
229     }
230 
231     assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
232     assert(tmp.magic == HCMAGIC);
233 
234     if (trans) {
235 	fill = trans;
236     } else {
237 #if USE_PTHREADS
238 	pthread_mutex_lock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
239 	for (fill = hc->hct_hash[tmp.id & HCTHASH_MASK];
240 	     fill;
241 	     fill = fill->next)
242 	{
243 	    if (fill->state == HCT_SENT && fill->id == tmp.id)
244 		    break;
245 	}
246 	pthread_mutex_unlock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
247 	if (fill == NULL)
248 #endif
249 	{
250 	    fprintf(stderr,
251 		    "cpdup hlink protocol error with %s (%04x %04x)\n",
252 		    hc->host, trans->id, tmp.id);
253 	    exit(1);
254 	}
255     }
256 
257     bcopy(&tmp, fill->rbuf, n);
258     aligned_bytes = HCC_ALIGN(tmp.bytes);
259 
260     while (n < aligned_bytes) {
261 	r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
262 	if (r <= 0)
263 	    goto fail;
264 	n += r;
265     }
266 #ifdef DEBUG
267     hcc_debug_dump(head);
268 #endif
269 #if USE_PTHREADS
270     pthread_mutex_lock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
271 #endif
272     fill->state = HCT_REPLIED;
273 #if USE_PTHREADS
274     if (fill->waiting)
275 	pthread_cond_signal(&fill->cond);
276     pthread_mutex_unlock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
277 #endif
278     return((void *)fill->rbuf);
279 fail:
280     return(NULL);
281 }
282 
283 #if USE_PTHREADS
284 
285 static
286 hctransaction_t
287 hcc_get_trans(struct HostConf *hc)
288 {
289     hctransaction_t trans;
290     hctransaction_t scan;
291     pthread_t tid = pthread_self();
292     int i;
293 
294     i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
295 
296     pthread_mutex_lock(&hc->hct_mutex[i]);
297     for (trans = hc->hct_hash[i]; trans; trans = trans->next) {
298 	if (trans->tid == tid)
299 		break;
300     }
301     if (trans == NULL) {
302 	trans = malloc(sizeof(*trans));
303 	bzero(trans, sizeof(*trans));
304 	trans->tid = tid;
305 	trans->id = i;
306 	pthread_cond_init(&trans->cond, NULL);
307 	do {
308 		for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
309 			if (scan->id == trans->id) {
310 				trans->id += HCTHASH_SIZE;
311 				break;
312 			}
313 		}
314 	} while (scan != NULL);
315 
316 	trans->next = hc->hct_hash[i];
317 	hc->hct_hash[i] = trans;
318     }
319     pthread_mutex_unlock(&hc->hct_mutex[i]);
320     return(trans);
321 }
322 
323 void
324 hcc_free_trans(struct HostConf *hc)
325 {
326     hctransaction_t trans;
327     hctransaction_t *transp;
328     pthread_t tid = pthread_self();
329     int i;
330 
331     i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
332 
333     pthread_mutex_lock(&hc->hct_mutex[i]);
334     for (transp = &hc->hct_hash[i]; *transp; transp = &trans->next) {
335 	trans = *transp;
336 	if (trans->tid == tid) {
337 		*transp = trans->next;
338 		pthread_cond_destroy(&trans->cond);
339 		free(trans);
340 		break;
341 	}
342     }
343     pthread_mutex_unlock(&hc->hct_mutex[i]);
344 }
345 
346 #else
347 
348 static
349 hctransaction_t
350 hcc_get_trans(struct HostConf *hc)
351 {
352     return(&hc->trans);
353 }
354 
355 void
356 hcc_free_trans(struct HostConf *hc __unused)
357 {
358     /* nop */
359 }
360 
361 #endif
362 
363 /*
364  * Initialize for a new command
365  */
366 hctransaction_t
367 hcc_start_command(struct HostConf *hc, int16_t cmd)
368 {
369     struct HCHead *whead;
370     hctransaction_t trans;
371 
372     trans = hcc_get_trans(hc);
373 
374     whead = (void *)trans->wbuf;
375     whead->magic = HCMAGIC;
376     whead->bytes = 0;
377     whead->cmd = cmd;
378     whead->id = trans->id;
379     whead->error = 0;
380 
381     trans->windex = sizeof(*whead);
382     trans->hc = hc;
383     trans->state = HCT_IDLE;
384 
385     return(trans);
386 }
387 
388 static void
389 hcc_start_reply(hctransaction_t trans, struct HCHead *rhead)
390 {
391     struct HCHead *whead = (void *)trans->wbuf;
392 
393     whead->magic = HCMAGIC;
394     whead->bytes = 0;
395     whead->cmd = rhead->cmd | HCF_REPLY;
396     whead->id = rhead->id;
397     whead->error = 0;
398 
399     trans->windex = sizeof(*whead);
400 }
401 
402 /*
403  * Finish constructing a command, transmit it, and await the reply.
404  * Return the HCHead of the reply.
405  */
406 struct HCHead *
407 hcc_finish_command(hctransaction_t trans)
408 {
409     struct HostConf *hc;
410     struct HCHead *whead;
411     struct HCHead *rhead;
412     int aligned_bytes;
413     int16_t wcmd;
414 
415     hc = trans->hc;
416     whead = (void *)trans->wbuf;
417     whead->bytes = trans->windex;
418     aligned_bytes = HCC_ALIGN(trans->windex);
419 
420     trans->state = HCT_SENT;
421 
422     if (write(hc->fdout, whead, aligned_bytes) != aligned_bytes) {
423 #ifdef __error
424 	*__error = EIO;
425 #else
426 	errno = EIO;
427 #endif
428 	if (whead->cmd < 0x0010)
429 		return(NULL);
430 	fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
431 	exit(1);
432     }
433 
434     wcmd = whead->cmd;
435 
436     /*
437      * whead is invalid when we call hcc_read_command() because
438      * we may switch to another thread.
439      */
440 #if USE_PTHREADS
441     pthread_mutex_unlock(&MasterMutex);
442     while (trans->state != HCT_REPLIED && hc->reader_thread) {
443 	pthread_mutex_t *mtxp = &hc->hct_mutex[trans->id & HCTHASH_MASK];
444 	pthread_mutex_lock(mtxp);
445 	trans->waiting = 1;
446 	if (trans->state != HCT_REPLIED && hc->reader_thread)
447 		pthread_cond_wait(&trans->cond, mtxp);
448 	trans->waiting = 0;
449 	pthread_mutex_unlock(mtxp);
450     }
451     pthread_mutex_lock(&MasterMutex);
452     rhead = (void *)trans->rbuf;
453 #else
454     rhead = hcc_read_command(hc, trans);
455 #endif
456     if (trans->state != HCT_REPLIED || rhead->id != trans->id) {
457 #ifdef __error
458 	*__error = EIO;
459 #else
460 	errno = EIO;
461 #endif
462 	if (wcmd < 0x0010)
463 		return(NULL);
464 	fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
465 	exit(1);
466     }
467     trans->state = HCT_DONE;
468 
469     if (rhead->error) {
470 #ifdef __error
471 	*__error = rhead->error;
472 #else
473 	errno = rhead->error;
474 #endif
475     }
476     return (rhead);
477 }
478 
479 void
480 hcc_leaf_string(hctransaction_t trans, int16_t leafid, const char *str)
481 {
482     struct HCLeaf *item;
483     int bytes = strlen(str) + 1;
484 
485     item = (void *)(trans->wbuf + trans->windex);
486     assert(trans->windex + sizeof(*item) + bytes < 65536);
487     item->leafid = leafid;
488     item->reserved = 0;
489     item->bytes = sizeof(*item) + bytes;
490     bcopy(str, item + 1, bytes);
491     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
492 }
493 
494 void
495 hcc_leaf_data(hctransaction_t trans, int16_t leafid, const void *ptr, int bytes)
496 {
497     struct HCLeaf *item;
498 
499     item = (void *)(trans->wbuf + trans->windex);
500     assert(trans->windex + sizeof(*item) + bytes < 65536);
501     item->leafid = leafid;
502     item->reserved = 0;
503     item->bytes = sizeof(*item) + bytes;
504     bcopy(ptr, item + 1, bytes);
505     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
506 }
507 
508 void
509 hcc_leaf_int32(hctransaction_t trans, int16_t leafid, int32_t value)
510 {
511     struct HCLeaf *item;
512 
513     item = (void *)(trans->wbuf + trans->windex);
514     assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
515     item->leafid = leafid;
516     item->reserved = 0;
517     item->bytes = sizeof(*item) + sizeof(value);
518     *(int32_t *)(item + 1) = value;
519     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
520 }
521 
522 void
523 hcc_leaf_int64(hctransaction_t trans, int16_t leafid, int64_t value)
524 {
525     struct HCLeaf *item;
526 
527     item = (void *)(trans->wbuf + trans->windex);
528     assert(trans->windex + sizeof(*item) + sizeof(value) < 65536);
529     item->leafid = leafid;
530     item->reserved = 0;
531     item->bytes = sizeof(*item) + sizeof(value);
532     *(int64_t *)(item + 1) = value;
533     trans->windex = HCC_ALIGN(trans->windex + item->bytes);
534 }
535 
536 int
537 hcc_alloc_descriptor(struct HostConf *hc, void *ptr, int type)
538 {
539     struct HCHostDesc *hd;
540     struct HCHostDesc *hnew;
541 
542     hnew = malloc(sizeof(struct HCHostDesc));
543     hnew->type = type;
544     hnew->data = ptr;
545 
546     if ((hd = hc->hostdescs) != NULL) {
547 	hnew->desc = hd->desc + 1;
548     } else {
549 	hnew->desc = 1;
550     }
551     hnew->next = hd;
552     hc->hostdescs = hnew;
553     return(hnew->desc);
554 }
555 
556 void *
557 hcc_get_descriptor(struct HostConf *hc, int desc, int type)
558 {
559     struct HCHostDesc *hd;
560 
561     for (hd = hc->hostdescs; hd; hd = hd->next) {
562 	if (hd->desc == desc && hd->type == type)
563 	    return(hd->data);
564     }
565     return(NULL);
566 }
567 
568 void
569 hcc_set_descriptor(struct HostConf *hc, int desc, void *ptr, int type)
570 {
571     struct HCHostDesc *hd;
572     struct HCHostDesc **hdp;
573 
574     for (hdp = &hc->hostdescs; (hd = *hdp) != NULL; hdp = &hd->next) {
575 	if (hd->desc == desc) {
576 	    if (ptr) {
577 		hd->data = ptr;
578 		hd->type = type;
579 	    } else {
580 		*hdp = hd->next;
581 		free(hd);
582 	    }
583 	    return;
584 	}
585     }
586     if (ptr) {
587 	hd = malloc(sizeof(*hd));
588 	hd->desc = desc;
589 	hd->type = type;
590 	hd->data = ptr;
591 	hd->next = hc->hostdescs;
592 	hc->hostdescs = hd;
593     }
594 }
595 
596 struct HCLeaf *
597 hcc_firstitem(struct HCHead *head)
598 {
599     struct HCLeaf *item;
600     int offset;
601 
602     offset = sizeof(*head);
603     if (offset == head->bytes)
604 	return(NULL);
605     assert(head->bytes >= offset + (int)sizeof(*item));
606     item = (void *)(head + 1);
607     assert(head->bytes >= offset + item->bytes);
608     assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
609     return (item);
610 }
611 
612 struct HCLeaf *
613 hcc_nextitem(struct HCHead *head, struct HCLeaf *item)
614 {
615     int offset;
616 
617     item = (void *)((char *)item + HCC_ALIGN(item->bytes));
618     offset = (char *)item - (char *)head;
619     if (offset == head->bytes)
620 	return(NULL);
621     assert(head->bytes >= offset + (int)sizeof(*item));
622     assert(head->bytes >= offset + item->bytes);
623     assert(item->bytes >= (int)sizeof(*item) && item->bytes < 65536 - offset);
624     return (item);
625 }
626 
627 #ifdef DEBUG
628 
629 void
630 hcc_debug_dump(struct HCHead *head)
631 {
632     struct HCLeaf *item;
633     int aligned_bytes = HCC_ALIGN(head->bytes);
634 
635     fprintf(stderr, "DUMP %04x (%d)", (u_int16_t)head->cmd, aligned_bytes);
636     if (head->cmd & HCF_REPLY)
637 	fprintf(stderr, " error %d", head->error);
638     fprintf(stderr, "\n");
639     for (item = hcc_firstitem(head); item; item = hcc_nextitem(head, item)) {
640 	fprintf(stderr, "    ITEM %04x DATA ", item->leafid);
641 	switch(item->leafid & LCF_TYPEMASK) {
642 	case LCF_INT32:
643 	    fprintf(stderr, "int32 %d\n", *(int32_t *)(item + 1));
644 	    break;
645 	case LCF_INT64:
646 	    fprintf(stderr, "int64 %lld\n", *(int64_t *)(item + 1));
647 	    break;
648 	case LCF_STRING:
649 	    fprintf(stderr, "\"%s\"\n", (char *)(item + 1));
650 	    break;
651 	case LCF_BINARY:
652 	    fprintf(stderr, "(binary)\n");
653 	    break;
654 	default:
655 	    printf("?\n");
656 	}
657     }
658 }
659 
660 #endif
661