1 /*
2 ** Copyright (C) 2006-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8 
9 /*
10 **  rwtransfer.c
11 **
12 **    This file contains functions that are common to rwsender and
13 **    rwreceiver, such as options processing and establishing the
14 **    connection.
15 **
16 */
17 
18 
19 #include <silk/silk.h>
20 
21 RCSIDENT("$SiLK: rwtransfer.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
22 
23 #include <silk/utils.h>
24 #include <silk/sklog.h>
25 #include <silk/skdaemon.h>
26 #include "rwtransfer.h"
27 
28 
29 /* LOCAL DEFINES AND TYPEDEFS */
30 
31 /* Illegal ident characters */
32 #define ILLEGAL_IDENT_CHARS " \t:/\\.,"
33 
34 /* Define lowest protocol version which we handle */
35 #define LOW_VERSION  1
36 
37 /* Version protocol we emit */
38 #define EMIT_VERISION 2
39 
40 /* Environment variable used to turn off keepalive.  Used for
41  * debugging. */
42 #define RWTRANSFER_TURN_OFF_KEEPALIVE "RWTRANSFER_TURN_OFF_KEEPALIVE"
43 
44 /* Maximum expected size of connection information string*/
45 #define RWTRANSFER_CONNECTION_TYPE_SIZE_MAX 50
46 
47 typedef struct connection_msg_data_st {
48     const char *name;
49     int32_t     size;
50 } connection_msg_data_t;
51 
52 
53 /* EXPORTED VARIABLE DEFINITIONS */
54 
55 int main_retval = EXIT_SUCCESS;
56 
57 
58 /* LOCAL VARIABLE DEFINITIONS */
59 
60 /* Mode (client/server) */
61 static enum {CLIENT, SERVER, NOT_SET} mode;
62 
63 #define OPTION_NOT_SEEN -1
64 
65 /* Initialize these to OPTION_NOT_SEEN. Set to the opt_index in the
66  * client and server option handlers to know what types of options
67  * were given. */
68 static int client_sentinel;
69 static int server_sentinel;
70 
71 /* Daemon identity */
72 static char *identity;
73 
74 /* Whether GnuTLS CA/key/certificate files were given */
75 static unsigned int tls_available;
76 
77 /* Message queue */
78 static sk_msg_queue_t *control;
79 
80 /* Temporary transfer_t item */
81 static transfer_t *global_temp_item;
82 
83 /* Control message thread */
84 static pthread_t control_thread;
85 static int       control_thread_valid;
86 
87 /* Address upon which to listen for incoming connections */
88 static sk_sockaddr_array_t *listen_address = NULL;
89 static const char *listen_address_arg = NULL;
90 
91 /* Locations which can be addressed as return values */
92 static void *exit_standard   = &exit_standard;
93 static void *exit_disconnect = &exit_disconnect;
94 static void *exit_failure    = &exit_failure;
95 
96 /* Main thread */
97 static pthread_t main_thread;
98 
99 /* Detached thread entry/exit control (see comment in serverMain()) */
100 static uint16_t detached_thread_count = 0;
101 static pthread_mutex_t detached_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
102 static pthread_cond_t  detached_thread_cond  = PTHREAD_COND_INITIALIZER;
103 
104 typedef int (*connection_fn_t)(
105     sk_msg_queue_t *,
106     struct sockaddr *,
107     socklen_t,
108     skm_channel_t *);
109 
110 typedef struct conn_info_st {
111     sk_msg_queue_t *queue;
112     skm_channel_t   channel;
113     transfer_t     *trnsfr;
114     unsigned        tls;
115 } conn_info_t;
116 
117 typedef enum {
118     /* Global options */
119     OPT_MODE, OPT_IDENT
120 } appOptionsEnum;
121 
122 static struct option appOptions[] = {
123     {"mode",            REQUIRED_ARG, 0, OPT_MODE},
124     {"identifier",      REQUIRED_ARG, 0, OPT_IDENT},
125     {0,0,0,0}           /* sentinel entry */
126 };
127 
128 static const char *appHelp[] = {
129     ("Run as a client or as a server. Choices: client, server"),
130     ("Specify the name to use when establishing connections"),
131     (char *)NULL
132 };
133 
134 typedef enum {
135     /* Client options */
136     OPT_SERVER_ADDR
137 } appClientOptionsEnum;
138 
139 static struct option appClientOptions[] = {
140     {"server-address", REQUIRED_ARG, 0, OPT_SERVER_ADDR},
141     {0,0,0,0}           /* sentinel entry */
142 };
143 
144 static const char *appClientHelp[] = {
145     ("Connect to the server having this identifier, name,\n"
146      "\tand port, specified as IDENT:HOST:PORT. Wrap an IPv6 address in\n"
147      "\tsquare brackets. Repeat to connect to multiple servers"),
148     (char *)NULL
149 };
150 
151 typedef enum {
152     /* Server options */
153     OPT_SERVER_PORT, OPT_CLIENT_IDENT
154 } appServerOptionsEnum;
155 
156 static struct option appServerOptions[] = {
157     {"server-port",  REQUIRED_ARG, 0, OPT_SERVER_PORT},
158     {"client-ident", REQUIRED_ARG, 0, OPT_CLIENT_IDENT},
159     {0,0,0,0}           /* sentinel entry */
160 };
161 
162 static const char *appServerHelp[] = {
163     ("Listen for incoming client connections on this port.\n"
164      "\tListen on all addresses unless a host is provided before the port,\n"
165      "\tspecified as HOST:PORT. Wrap an IPv6 address in square brackets"),
166     ("Allow a client having this identifier to connect to\n"
167      "\tthis server. Repeat to allow connections from multiple clients"),
168     (char *)NULL
169 };
170 
171 
172 /*
173  *  Connection message textual representation and lengths.
174  *
175  *  Length of -1 indicates a variable length message (use of
176  *  sendString() implies variable length).
177  */
178 static connection_msg_data_t
179 conn_msg_data[CONN_NUMBER_OF_CONNECTION_MESSAGES] = {
180     {"CONN_SENDER_VERSION",    sizeof(uint32_t)},
181     {"CONN_RECEIVER_VERSION",  sizeof(uint32_t)},
182     {"CONN_IDENT",            -1},
183     {"CONN_READY",             0},
184     {"CONN_DISCONNECT_RETRY", -1},
185     {"CONN_DISCONNECT",       -1},
186     {"CONN_NEW_FILE",         -1},
187     {"CONN_NEW_FILE_READY",    0},
188     {"CONN_FILE_BLOCK",       -1},
189     {"CONN_FILE_COMPLETE",     0},
190     {"CONN_DUPLICATE_FILE",   -1},
191     {"CONN_REJECT_FILE",      -1}
192 };
193 
194 
195 /* LOCAL FUNCTION PROTOTYPES */
196 
197 static void *clientMain(void *); /* Thread entry point */
198 static void *serverMain(void *); /* Thread entry point */
199 static int
200 appOptionsHandler(
201     clientData          cData,
202     int                 opt_index,
203     char               *opt_arg);
204 static int
205 appClientOptionsHandler(
206     clientData          cData,
207     int                 opt_index,
208     char               *opt_arg);
209 static int
210 appServerOptionsHandler(
211     clientData          cData,
212     int                 opt_index,
213     char               *opt_arg);
214 static void
215 parseServerAddress(
216     const char         *const_addr);
217 static void
218 addClientIdent(
219     const char         *ident);
220 
221 
222 /* FUNCTION DEFINITIONS */
223 
224 /*
225  *  checkIdent(ident, switch_name);
226  *
227  *    Check to see if an ident is legal.  If illegal, print an error
228  *    message to the error stream, end exit.
229  */
230 int
checkIdent(const char * ident,const char * switch_name)231 checkIdent(
232     const char         *ident,
233     const char         *switch_name)
234 {
235     const char *invalid;
236     const char *c;
237 
238     if (ident == NULL || ident[0] == '\0') {
239         skAppPrintErr(
240             "Invalid %s: Identifier must contain at least one character",
241             switch_name);
242         exit(EXIT_FAILURE);
243     }
244     invalid = strpbrk(ident, ILLEGAL_IDENT_CHARS);
245     if (invalid != NULL) {
246         skAppPrintErr(
247             "Invalid %s: Identifier '%s' contains the illegal character '%c'",
248             switch_name, ident, *invalid);
249         exit(EXIT_FAILURE);
250     }
251     for (c = ident; *c; c++) {
252         if (!isprint((int)*c)) {
253             skAppPrintErr(("Invalid %s: Identifier '%s' contains"
254                            " the nonprintable character %#x"),
255                           switch_name, ident, (int)*c);
256             exit(EXIT_FAILURE);
257         }
258     }
259 
260     return 0;
261 }
262 
263 
264 /*
265  * appModeUsage();
266  *
267  *    Print usage information for the mode named 'mode_str', using the
268  *    given 'options' and 'help'.
269  */
270 static void
appModeUsage(FILE * fh,const char * mode_str,struct option options[],const char * help[])271 appModeUsage(
272     FILE               *fh,
273     const char         *mode_str,
274     struct option       options[],
275     const char         *help[])
276 {
277     unsigned int i;
278 
279     fprintf(fh, "\n%s switches:\n", mode_str);
280     for (i = 0; options[i].name; ++i) {
281         if (help[i]) {
282             fprintf(fh, "--%s %s. %s\n", options[i].name,
283                     SK_OPTION_HAS_ARG(options[i]), help[i]);
284         }
285     }
286 }
287 
288 
289 void
transferUsageLong(FILE * fh,const char * usage,struct option options[],const char * help[])290 transferUsageLong(
291     FILE               *fh,
292     const char         *usage,
293     struct option       options[],
294     const char         *help[])
295 {
296     unsigned int i;
297     unsigned int j;
298 
299     fprintf(fh, "%s %s", skAppName(), usage);
300 
301     fprintf(fh, "\nCommon switches:\n");
302     skOptionsDefaultUsage(fh);
303 
304     /* print common options defined in this file, but do not print
305      * encryption switches yet */
306     for (i = 0; appOptions[i].name; ++i) {
307         fprintf(fh, "--%s %s. %s\n", appOptions[i].name,
308                 SK_OPTION_HAS_ARG(appOptions[i]), appHelp[i]);
309     }
310     /* print the application-specific switches */
311     for (j = 0; options[j].name; ++j) {
312         fprintf(fh, "--%s %s. %s\n", options[j].name,
313                 SK_OPTION_HAS_ARG(options[j]), help[j]);
314     }
315     /* print switches for client and server mode */
316     appModeUsage(fh, "Client", appClientOptions, appClientHelp);
317     appModeUsage(fh, "Server", appServerOptions, appServerHelp);
318 
319     /* now print the encryption switches */
320     skMsgTlsOptionsUsage(fh);
321 
322     fprintf(fh, "\nLogging and daemon switches:\n");
323     skdaemonOptionsUsage(fh);
324 }
325 
326 
327 int
transferSetup(void)328 transferSetup(
329     void)
330 {
331     /* verify that the sizes of options and help match */
332     assert((sizeof(appHelp)/sizeof(char*)) ==
333            (sizeof(appOptions)/sizeof(struct option)));
334     assert((sizeof(appClientHelp)/sizeof(char*)) ==
335            (sizeof(appClientOptions)/sizeof(struct option)));
336     assert((sizeof(appServerHelp)/sizeof(char*)) ==
337            (sizeof(appServerOptions)/sizeof(struct option)));
338 
339     mode                  = NOT_SET;
340     client_sentinel       = OPTION_NOT_SEEN;
341     server_sentinel       = OPTION_NOT_SEEN;
342     identity              = NULL;
343     global_temp_item      = NULL;
344     control_thread_valid  = 0;
345 
346     /* register the options and handler */
347     if (skOptionsRegister(appOptions, &appOptionsHandler, NULL))
348     {
349         skAppPrintErr("Unable to transfer application options");
350         return -1;
351     }
352 
353     /* register the client options and handler */
354     if (skOptionsRegister(appClientOptions, &appClientOptionsHandler, NULL))
355     {
356         skAppPrintErr("Unable to register client options");
357         return -1;
358     }
359 
360     /* register the server options and handler */
361     if (skOptionsRegister(appServerOptions, &appServerOptionsHandler, NULL))
362     {
363         skAppPrintErr("Unable to register server options");
364         return -1;
365     }
366 
367     if (skMsgTlsOptionsRegister(password_env)) {
368         skAppPrintErr("Unable to register TLS-related options");
369         return -1;
370     }
371 
372     return 0;
373 }
374 
375 
376 /*
377  *  status = appOptionsHandler(cData, opt_index, opt_arg);
378  *
379  *    This function is passed to skOptionsRegister(); it will be called
380  *    by skOptionsParse() for each user-specified switch that the
381  *    application has registered; it should handle the switch as
382  *    required---typically by setting global variables---and return 1
383  *    if the switch processing failed or 0 if it succeeded.  Returning
384  *    a non-zero from from the handler causes skOptionsParse() to return
385  *    a negative value.
386  *
387  *    The clientData in 'cData' is typically ignored; 'opt_index' is
388  *    the index number that was specified as the last value for each
389  *    struct option in appOptions[]; 'opt_arg' is the user's argument
390  *    to the switch for options that have a REQUIRED_ARG or an
391  *    OPTIONAL_ARG.
392  */
393 static int
appOptionsHandler(clientData cData,int opt_index,char * opt_arg)394 appOptionsHandler(
395     clientData          cData,
396     int                 opt_index,
397     char               *opt_arg)
398 {
399     SK_UNUSED_PARAM(cData);
400 
401     switch ((appOptionsEnum)opt_index) {
402       case OPT_MODE:
403         if (0 == strcmp(opt_arg, "server")) {
404             mode = SERVER;
405         } else if (0 == strcmp(opt_arg, "client")) {
406             mode = CLIENT;
407         } else {
408             skAppPrintErr("Invalid --%s '%s'",
409                           appOptions[opt_index].name, opt_arg);
410             return 1;
411         }
412         break;
413 
414       case OPT_IDENT:
415         checkIdent(opt_arg, appOptions[opt_index].name);
416         identity = opt_arg;
417         break;
418     }
419 
420     return 0;  /* OK */
421 }
422 
423 
424 int
transferVerifyOptions(void)425 transferVerifyOptions(
426     void)
427 {
428     RBLIST *list;
429     transfer_t *item;
430     int error_count = 0;
431 
432     /* Check mode options */
433     if (mode == NOT_SET) {
434         skAppPrintErr(("Client or server mode must be chosen "
435                        "via the --%s switch"),
436                       appOptions[OPT_MODE].name);
437         ++error_count;
438     }
439     if ((mode == CLIENT && (server_sentinel != OPTION_NOT_SEEN)) ||
440         (mode == SERVER && (client_sentinel != OPTION_NOT_SEEN)))
441     {
442         int badopt = (mode == CLIENT) ? server_sentinel : client_sentinel;
443         struct option *opts =
444             (mode == CLIENT) ? appServerOptions : appClientOptions;
445         const char *mode_string = (mode == CLIENT) ? "client" : "server";
446         skAppPrintErr("The --%s switch cannot be used in %s mode",
447                       opts[badopt].name, mode_string);
448         return -1;
449     }
450 
451     if (identity == NULL) {
452         skAppPrintErr("The --%s switch is required",
453                       appOptions[OPT_IDENT].name);
454         ++error_count;
455     }
456 
457     if (rbmin(transfers) == NULL && mode != NOT_SET) {
458         skAppPrintErr("Must supply at least one --%s switch",
459                       (mode == CLIENT)
460                       ? appClientOptions[OPT_SERVER_ADDR].name
461                       : appServerOptions[OPT_CLIENT_IDENT].name);
462         ++error_count;
463     }
464 
465     if (skMsgTlsOptionsVerify(&tls_available)) {
466         ++error_count;
467     }
468 
469     switch (mode) {
470       case SERVER:
471         if (listen_address == NULL) {
472             skAppPrintErr("Must supply a port using --%s in server mode",
473                           appServerOptions[OPT_SERVER_PORT].name);
474             ++error_count;
475         }
476         break;
477 
478       case CLIENT:
479         list = rbopenlist(transfers);
480         if (list == NULL) {
481             skAppPrintErr("Memory allocation failure verifying options");
482             return -1;
483         }
484         while ((item = (transfer_t *)rbreadlist(list)) != NULL) {
485             if (item->address_exists == 0) {
486                 skAppPrintErr("Ident %s has no address associated with it",
487                               item->ident);
488                 return -1;
489             }
490         }
491         rbcloselist(list);
492         break;
493 
494       case NOT_SET:
495         break;
496     }
497 
498     if (error_count) {
499         return -1;
500     }
501 
502     main_thread = pthread_self();
503 
504     return 0;
505 }
506 
507 
508 void
transferShutdown(void)509 transferShutdown(
510     void)
511 {
512     RBLIST *iter;
513     transfer_t *trnsfr;
514     int rv;
515 
516     assert(shuttingdown);
517 
518     skMsgQueueShutdownAll(control);
519 
520     iter = rbopenlist(transfers);
521     CHECK_ALLOC(iter);
522     while ((trnsfr = (transfer_t *)rbreadlist(iter)) != NULL) {
523         rv = transferUnblock(trnsfr);
524         if (rv != 0) {
525             CRITMSG("Unexpected failure to unblock transfer");
526             _exit(EXIT_FAILURE);
527         }
528     }
529     rbcloselist(iter);
530 }
531 
532 
533 void
transferTeardown(void)534 transferTeardown(
535     void)
536 {
537     /* Wait for transfer threads to end.  In server mode, all these
538      * threads are detached, and as such may not be joinable.  */
539     if (mode != SERVER) {
540         RBLIST *iter;
541         transfer_t *trnsfr;
542 
543         iter = rbopenlist(transfers);
544         CHECK_ALLOC(iter);
545         while ((trnsfr = (transfer_t *)rbreadlist(iter)) != NULL) {
546             if (trnsfr->thread_exists) {
547                 DEBUGMSG("Waiting for thread %s to end...", trnsfr->ident);
548                 pthread_join(trnsfr->thread, NULL);
549                 DEBUGMSG("Thread %s has ended.", trnsfr->ident);
550             }
551         }
552         rbcloselist(iter);
553     }
554 
555     /* Wait for control thread to end */
556     if (control_thread_valid) {
557         DEBUGMSG("Waiting for control thread to end...");
558         pthread_join(control_thread, NULL);
559         DEBUGMSG("Control thread has ended.");
560     }
561 
562     /* Wait for detached threads to end */
563     DEBUGMSG("Waiting for detached threads to end...");
564     pthread_mutex_lock(&detached_thread_mutex);
565     while (detached_thread_count) {
566         pthread_cond_wait(&detached_thread_cond, &detached_thread_mutex);
567     }
568     pthread_mutex_unlock(&detached_thread_mutex);
569     DEBUGMSG("Detached threads have ended.");
570 
571     /* Destroy stuff */
572     skMsgQueueDestroy(control);
573     if (listen_address) {
574         skSockaddrArrayDestroy(listen_address);
575         listen_address = NULL;
576     }
577     if (global_temp_item != NULL) {
578         free(global_temp_item);
579     }
580 
581     skMsgGnuTLSTeardown();
582 }
583 
584 
585 static int
appClientOptionsHandler(clientData cData,int opt_index,char * opt_arg)586 appClientOptionsHandler(
587     clientData          cData,
588     int                 opt_index,
589     char               *opt_arg)
590 {
591     SK_UNUSED_PARAM(cData);
592 
593     client_sentinel = opt_index;
594 
595     switch ((appClientOptionsEnum)opt_index) {
596       case OPT_SERVER_ADDR:
597         parseServerAddress(opt_arg);
598         break;
599     }
600 
601     return 0;  /* OK */
602 }
603 
604 
605 static int
appServerOptionsHandler(clientData cData,int opt_index,char * opt_arg)606 appServerOptionsHandler(
607     clientData          cData,
608     int                 opt_index,
609     char               *opt_arg)
610 {
611     int rv;
612 
613     SK_UNUSED_PARAM(cData);
614 
615     server_sentinel = opt_index;
616 
617     switch ((appServerOptionsEnum)opt_index) {
618       case OPT_CLIENT_IDENT:
619         addClientIdent(opt_arg);
620         break;
621 
622       case OPT_SERVER_PORT:
623         rv = skStringParseHostPortPair(&listen_address,opt_arg, PORT_REQUIRED);
624         if (rv) {
625             skAppPrintErr("Invalid %s '%s': %s",
626                           appOptions[opt_index].name, opt_arg,
627                           skStringParseStrerror(rv));
628             return 1;
629         }
630         listen_address_arg = opt_arg;
631         break;
632     }
633 
634     return 0;  /* OK */
635 }
636 
637 
638 
639 /* String compare for receiver rbtree */
640 static int
transferCompare(const void * va,const void * vb,const void * cbdata)641 transferCompare(
642     const void         *va,
643     const void         *vb,
644     const void         *cbdata)
645 {
646     const transfer_t *a = (const transfer_t *)va;
647     const transfer_t *b = (const transfer_t *)vb;
648     SK_UNUSED_PARAM(cbdata);
649     return strcmp(a->ident, b->ident);
650 }
651 
652 
653 struct rbtree *
transferIdentTreeCreate(void)654 transferIdentTreeCreate(
655     void)
656 {
657     return rbinit(transferCompare, NULL);
658 }
659 
660 
661 /* Create temporary transfer_t objects */
662 transfer_t *
initTemp(void)663 initTemp(
664     void)
665 {
666     /* Allocate and/or clear the temporary item */
667     if (global_temp_item == NULL) {
668         global_temp_item = (transfer_t *)calloc(1, sizeof(*global_temp_item));
669     } else {
670         memset(global_temp_item, 0, sizeof(*global_temp_item));
671     }
672     return global_temp_item;
673 }
674 
675 /* If a program wishes to keep a temporary transfer object, it should
676    call this. */
677 void
clearTemp(void)678 clearTemp(
679     void)
680 {
681     global_temp_item = NULL;
682 }
683 
684 
685 /* Parse a <ident>:<address>:<port> specification */
686 static void
parseServerAddress(const char * const_addr)687 parseServerAddress(
688     const char         *const_addr)
689 {
690 #define FMT_PARSE_FAILURE                               \
691     ("Server address parse failure parsing '%s'\n"      \
692      "\tCorrect form is <ident>:<address>:<port>")
693 #define FMT_MEM_FAILURE                                         \
694     "Memory allocation failure when parsing server address %s"
695 
696     char *addr = strdup(const_addr);
697     char *colon, *next;
698     transfer_t *old;
699     const void *test;
700     int rv;
701     transfer_t *temp_item;
702 
703     temp_item = initTemp();
704 
705     if (addr == NULL || temp_item == NULL) {
706         skAppPrintErr(FMT_MEM_FAILURE, const_addr);
707         exit(EXIT_FAILURE);
708     }
709 
710     /* First, extract the ident */
711     colon = strchr(addr, ':');
712     if (colon == NULL) {
713         free(addr);
714         skAppPrintErr(FMT_PARSE_FAILURE, const_addr);
715         exit(EXIT_FAILURE);
716     }
717     *colon = '\0';
718     checkIdent(addr, appClientOptions[OPT_SERVER_ADDR].name);
719     temp_item->ident = addr;
720 
721 
722     /* See if it has already been used */
723     old = (transfer_t *)rbfind(temp_item, transfers);
724     if (old != NULL) {
725         if (!old->address_exists) {
726             memcpy(temp_item, old, sizeof(*temp_item));
727             test = rbdelete(old, transfers);
728             assert(test == old);
729             temp_item->ident = addr;
730             free(old->ident);
731             free(old);
732         } else {
733             free(addr);
734             skAppPrintErr("Duplicate ident in server address %s", const_addr);
735             exit(EXIT_FAILURE);
736         }
737     }
738 
739     /* Next, extract the address */
740     next = colon + 1;
741     rv = skStringParseHostPortPair(&temp_item->addr, next,
742                                    HOST_REQUIRED | PORT_REQUIRED);
743     if (rv < 0) {
744         skAppPrintErr("Could not parse address: %s",
745                       skStringParseStrerror(rv));
746         exit(EXIT_FAILURE);
747     }
748 
749     /* Add to our server list */
750     temp_item->ident = strdup(temp_item->ident);
751     free(addr);
752     if (temp_item->ident == NULL) {
753         skAppPrintErr(FMT_MEM_FAILURE, const_addr);
754         exit(EXIT_FAILURE);
755     }
756     test = rbsearch(temp_item, transfers);
757     if (test == NULL) {
758         skAppPrintErr(FMT_MEM_FAILURE, const_addr);
759         exit(EXIT_FAILURE);
760     }
761     temp_item->address_exists = 1;
762 
763     assert(test == temp_item);
764     clearTemp();
765 
766 #undef FMT_PARSE_FAILURE
767 #undef FMT_MEM_FAILURE
768 }
769 
770 
771 /* Add a bare ident to the transfer list */
772 static void
addClientIdent(const char * ident)773 addClientIdent(
774     const char         *ident)
775 {
776 #define FMT_MEM_FAILURE "Memory allocation failure when parsing ident %s"
777     const void *test;
778     transfer_t *temp_item;
779 
780     checkIdent(ident, appServerOptions[OPT_CLIENT_IDENT].name);
781     temp_item = initTemp();
782     if (temp_item == NULL) {
783         skAppPrintErr(FMT_MEM_FAILURE, ident);
784         exit(EXIT_FAILURE);
785     }
786     temp_item->ident = (char *)ident;
787     test = rbsearch(temp_item, transfers);
788     if (test == NULL) {
789         skAppPrintErr(FMT_MEM_FAILURE, ident);
790         exit(EXIT_FAILURE);
791     }
792     if (test != temp_item) {
793         skAppPrintErr("Duplicate ident %s", ident);
794         exit(EXIT_FAILURE);
795     }
796     temp_item->ident = strdup(ident);
797     if (temp_item->ident == NULL) {
798         skAppPrintErr(FMT_MEM_FAILURE, ident);
799         exit(EXIT_FAILURE);
800     }
801     clearTemp();
802 #undef FMT_MEM_FAILURE
803 }
804 
805 
806 static void
getConnectionInformation(sk_msg_queue_t * queue,skm_channel_t channel,char * buffer,size_t buffer_size)807 getConnectionInformation(
808     sk_msg_queue_t     *queue,
809     skm_channel_t       channel,
810     char               *buffer,
811     size_t              buffer_size)
812 {
813     int rv;
814 
815     rv = skMsgGetConnectionInformation(queue, channel, buffer, buffer_size);
816     if (rv == -1) {
817         strncpy(buffer, "<unknown>", buffer_size);
818     }
819     buffer[buffer_size - 1] = '\0';
820 }
821 
822 
823 int
handleDisconnect(sk_msg_t * msg,const char * type)824 handleDisconnect(
825     sk_msg_t           *msg,
826     const char         *type)
827 {
828     skm_type_t msgtyp = skMsgType(msg);
829 
830     if (msgtyp == CONN_DISCONNECT || msgtyp == CONN_DISCONNECT_RETRY) {
831         int length = MAX_ERROR_MESSAGE;
832 
833         if (skMsgLength(msg) < length) {
834             length = skMsgLength(msg);
835         }
836 
837         INFOMSG("Connection %s disconnected: %.*s",
838                 type, length, (char *)skMsgMessage(msg));
839 
840         return (msgtyp == CONN_DISCONNECT) ? -1 : 1;
841     }
842 
843     return 0;
844 }
845 
846 
847 /*
848  *    This function is used by servers and clients.  The function
849  *    verifies the connection (version, ident), and then calls the
850  *    transferFiles() function defined in rwsender.c or rwreceiver.c.
851  *
852  *    For a server, this is a THREAD ENTRY POINT.  Entry point for the
853  *    "connection" thread, started from serverMain().  This thread is
854  *    detached.
855  *
856  *    For a client, this function is called by startClientConnection()
857  *    once the client has connected to a server.
858  */
859 static void *
handleConnection(void * vinfo)860 handleConnection(
861     void               *vinfo)
862 {
863     conn_info_t *info = (conn_info_t *)vinfo;
864     transfer_t target;
865     transfer_t *trnsfr = NULL;
866     transfer_t *found = NULL;
867     uint32_t version;
868     skm_channel_t channel;
869     sk_msg_queue_t *q;
870     enum conn_state {Version, Ident, Ready, Running, Disconnect} state;
871     int proto_err;
872     int fatal_err = 0;
873     const char *ident = "<unassigned>";
874     void *retval = exit_failure;
875     char connection_type[RWTRANSFER_CONNECTION_TYPE_SIZE_MAX];
876     int transferred_file = 0;
877 
878     DEBUG_PRINT1("connection thread started");
879 
880     q = info->queue;
881     channel = info->channel;
882     trnsfr = info->trnsfr;
883     free(info);
884 
885     /* start by sending my version and waiting for remote's version */
886     state = Version;
887     version = htonl(EMIT_VERISION);
888     proto_err = skMsgQueueSendMessage(q, channel, local_version_check,
889                                       &version, sizeof(version));
890 
891     while (!shuttingdown && !proto_err && !fatal_err && state != Running) {
892         int rv;
893         sk_msg_t *msg;
894 
895         rv = skMsgQueueGetMessage(q, &msg);
896         if (rv == -1) {
897             ASSERT_ABORT(shuttingdown);
898             continue;
899         }
900         DEBUG_PRINT3("handleConnection() state=%d, got msg type=%d",
901                      (int)state, (int)skMsgType(msg));
902 
903         rv = handleDisconnect(msg, ident);
904         if (rv) {
905             proto_err = 1;
906             retval = transferred_file ? exit_disconnect : exit_failure;
907             state = Disconnect;
908         }
909 
910         switch (state) {
911           case Version:
912             /* expecting remote's version. if not valid, close the
913              * channel.  if valid, send my ident and wait for remote's
914              * ident */
915             if ((proto_err = checkMsg(msg, q, remote_version_check))) {
916                 DEBUG_PRINT2("checkMsg(%s) FAILED",
917                              conn_msg_data[remote_version_check].name);
918                 retval = exit_failure;
919                 break;
920             }
921             DEBUG_PRINT2("Received %s",
922                          conn_msg_data[remote_version_check].name);
923             version = MSG_UINT32(msg);
924             if (version < LOW_VERSION) {
925                 sendString(q, skMsgChannel(msg), EXTERNAL,
926                            CONN_DISCONNECT, LOG_WARNING,
927                            ("Unsupported version %" PRIu32), version);
928                 proto_err = 1;
929                 retval = exit_failure;
930                 break;
931             }
932             if (!getenv(RWTRANSFER_TURN_OFF_KEEPALIVE)) {
933                 rv = skMsgSetKeepalive(q, channel, KEEPALIVE_TIMEOUT);
934                 assert(rv == 0);
935             }
936             state = Ident;
937             proto_err = skMsgQueueSendMessage(q, channel, CONN_IDENT,
938                                               identity, strlen(identity) + 1);
939             if (proto_err != 0) {
940                 retval = exit_failure;
941             }
942             break;
943 
944           case Ident:
945             /* expecting remote's ident.  if not valid, close the
946              * channel.  if valid, send CONN_READY and wait for remote
947              * to say it is ready */
948             if ((proto_err = checkMsg(msg, q, CONN_IDENT))) {
949                 DEBUG_PRINT1("checkMsg(CONN_IDENT) FAILED");
950                 retval = exit_failure;
951                 break;
952             }
953             DEBUG_PRINT1("Received CONN_IDENT");
954             target.ident = MSG_CHARP(msg);
955             found = (transfer_t *)rbfind(&target, transfers);
956             if (found == NULL
957                 || (trnsfr != NULL && trnsfr != found)
958                 || (trnsfr == NULL && found->thread_exists))
959             {
960                 const char *reason;
961                 if (found == NULL) {
962                     reason = "Unknown ident";
963                 } else if (trnsfr != NULL && trnsfr != found) {
964                     reason = "Unexpected ident";
965                 } else {
966                     reason = "Duplicate ident";
967                 }
968                 sendString(q, skMsgChannel(msg), EXTERNAL,
969                            CONN_DISCONNECT, LOG_WARNING,
970                            "%s %s", reason, target.ident);
971                 proto_err = 1;
972                 retval = exit_failure;
973                 break;
974             }
975             ident = found->ident;
976             found->thread = pthread_self();
977             found->thread_exists = 1;
978             found->channel = channel;
979             found->channel_exists = 1;
980             found->remote_version = version;
981 
982             getConnectionInformation(q, channel, connection_type,
983                                      sizeof(connection_type));
984             INFOMSG("Connected to remote %s (%s, Protocol v%" PRIu32 ")",
985                     ident, connection_type, version);
986             state = Ready;
987             proto_err = skMsgQueueSendMessage(q, channel, CONN_READY, NULL, 0);
988             if (proto_err != 0) {
989                 DEBUG_PRINT1("skMsgQueueSendMessage(CONN_READY) failed");
990                 retval = exit_failure;
991             }
992             break;
993 
994           case Ready:
995             /* expecting remote to say it is ready. if ready, call
996              * transferFiles() */
997             if ((proto_err = checkMsg(msg, q, CONN_READY))) {
998                 DEBUG_PRINT1("checkMsg(CONN_READY) FAILED");
999                 retval = exit_failure;
1000                 break;
1001             }
1002             DEBUGMSG("Remote %s is ready for messages", ident);
1003             state = Running;
1004             rv = transferFiles(q, channel, found);
1005             switch (rv) {
1006               case -1:
1007                 fatal_err = 1;
1008                 break;
1009               case 1:
1010                 transferred_file = 1;
1011                 break;
1012               default:
1013                 break;
1014             }
1015             break;
1016 
1017           case Disconnect:
1018             DEBUG_PRINT1("Disconnecting");
1019             break;
1020 
1021           case Running:
1022             ASSERT_ABORT(0);
1023         }
1024 
1025         /* Free message */
1026         skMsgDestroy(msg);
1027     }
1028 
1029     if (found) {
1030         found->channel_exists = 0;
1031         found->disconnect = 0;
1032     }
1033 
1034     skMsgQueueDestroy(q);
1035 
1036     /* If running in server mode, this was a detached thread. */
1037     if (trnsfr == NULL) {
1038         if (found) {
1039             found->thread_exists = 0;
1040         }
1041         pthread_mutex_lock(&detached_thread_mutex);
1042         detached_thread_count--;
1043         pthread_cond_signal(&detached_thread_cond);
1044         pthread_mutex_unlock(&detached_thread_mutex);
1045     }
1046 
1047     DEBUG_PRINT2("connection thread ended (status = %s)",
1048                  ((fatal_err)
1049                   ? "exit_failure [from transferFiles()]"
1050                   : ((exit_standard == retval)
1051                      ? "exit_standard"
1052                      : ((exit_disconnect == retval)
1053                         ? "exit_disconnect"
1054                         : ((exit_failure == retval)
1055                            ? "exit_failure"
1056                            : "UNKNOWN")))));
1057 
1058     if (fatal_err) {
1059         threadExit(EXIT_FAILURE, exit_failure);
1060     }
1061 
1062     return retval;
1063 }
1064 
1065 
1066 /*
1067  *    THREAD ENTRY POINT
1068  *
1069  *    Entry point for the "server_main" thread, started from
1070  *    startTransferDaemon().
1071  */
1072 static void *
serverMain(void * dummy)1073 serverMain(
1074     void       *dummy)
1075 {
1076     int rv;
1077     const char *connection_type = (tls_available ? "TCP, TLS" : "TCP");
1078 
1079     SK_UNUSED_PARAM(dummy);
1080 
1081     control_thread_valid = 1;
1082 
1083     DEBUG_PRINT1("server_main() thread started");
1084 
1085     assert(listen_address);
1086 
1087     rv = skMsgQueueBind(control, listen_address);
1088     if (rv < 0) {
1089         CRITMSG("Failed to bind to %s for listening", listen_address_arg);
1090         threadExit(EXIT_FAILURE, NULL);
1091     }
1092 
1093     INFOMSG("Bound to %s for listening (%s)",
1094             listen_address_arg, connection_type);
1095 
1096     while (!shuttingdown) {
1097         sk_msg_t *msg;
1098         skm_channel_t channel;
1099         pthread_t thread;
1100         conn_info_t *info;
1101         transfer_t *item;
1102         RBLIST *list;
1103         sk_new_channel_info_t *addr_info;
1104         char buf[PATH_MAX];
1105         char conn_type[RWTRANSFER_CONNECTION_TYPE_SIZE_MAX];
1106 
1107         rv = skMsgQueueGetMessageFromChannel(control, SKMSG_CHANNEL_CONTROL,
1108                                              &msg);
1109         if (rv == -1) {
1110             ASSERT_ABORT(shuttingdown);
1111             continue;
1112         }
1113 
1114         switch (skMsgType(msg)) {
1115 
1116           case SKMSG_CTL_NEW_CONNECTION:
1117             DEBUG_PRINT1("Received SKMSG_CTL_NEW_CONNECTION");
1118             channel = SKMSG_CTL_MSG_GET_CHANNEL(msg);
1119             addr_info = (sk_new_channel_info_t *)skMsgMessage(msg);
1120             getConnectionInformation(control, channel, conn_type,
1121                                      sizeof(conn_type));
1122             if (addr_info->known) {
1123                 skSockaddrString(buf, sizeof(buf), &addr_info->addr);
1124             }
1125             INFOMSG("Received connection from %s (%s)",
1126                     (addr_info->known ? buf : "unknown address"), conn_type);
1127             info = (conn_info_t *)calloc(1, sizeof(*info));
1128             if (info == NULL) {
1129                 CRITMSG("Memory allocation failure");
1130                 threadExit(EXIT_FAILURE, NULL);
1131             }
1132             info->tls = tls_available;
1133             info->trnsfr = NULL;
1134             rv = skMsgChannelSplit(control, channel, &info->queue);
1135             if (rv != 0) {
1136                 if (shuttingdown) {
1137                     break;
1138                 }
1139                 CRITMSG("Failed to split channel");
1140                 threadExit(EXIT_FAILURE, NULL);
1141             }
1142             info->channel = channel;
1143 
1144             /* In server mode we don't have one thread per ident.
1145              * Instead we have one thread per entity that is
1146              * connecting to us.  Since there is no transfer object to
1147              * attach the thread to, we create a detached thread
1148              * instead, and use the detached_thread_mutex and
1149              * detached_thread_count to know when the threads have
1150              * ended. */
1151             pthread_mutex_lock(&detached_thread_mutex);
1152             rv = skthread_create_detached("connection", &thread,
1153                                           handleConnection, info);
1154             if (rv != 0) {
1155                 pthread_mutex_unlock(&detached_thread_mutex);
1156                 CRITMSG("Failed to create connection thread: %s",
1157                         strerror(rv));
1158                 threadExit(EXIT_FAILURE, NULL);
1159             }
1160             detached_thread_count++;
1161             pthread_mutex_unlock(&detached_thread_mutex);
1162             break;
1163 
1164           case SKMSG_CTL_CHANNEL_DIED:
1165             DEBUG_PRINT1("Received SKMSG_CTL_CHANNEL_DIED");
1166             channel = SKMSG_CTL_MSG_GET_CHANNEL(msg);
1167             list = rbopenlist(transfers);
1168             CHECK_ALLOC(list);
1169             while ((item = (transfer_t *)rbreadlist(list)) != NULL) {
1170                 if (item->channel_exists && channel == item->channel) {
1171                     INFOMSG("Channel to %s died", item->ident);
1172                     item->disconnect = 1;
1173                     rv = transferUnblock(item);
1174                     if (rv != 0) {
1175                         threadExit(EXIT_FAILURE, NULL);
1176                     }
1177                     break;
1178                 }
1179             }
1180             rbcloselist(list);
1181 
1182             if (!shuttingdown) {
1183                 sendString(control, channel, INTERNAL, CONN_DISCONNECT_RETRY,
1184                            LOG_INFO, "Remote side of channel died");
1185             }
1186             break;
1187 
1188           default:
1189             WARNINGMSG("Received unknown control message %d", skMsgType(msg));
1190         }
1191 
1192         skMsgDestroy(msg);
1193     }
1194 
1195     DEBUG_PRINT1("server_main() thread ended");
1196 
1197     return NULL;
1198 }
1199 
1200 
1201 /*
1202  *    THREAD ENTRY POINT
1203  *
1204  *    Entry point for the "connection" thread, started from
1205  *    clientMain().
1206  */
1207 static void *
startClientConnection(void * vitem)1208 startClientConnection(
1209     void               *vitem)
1210 {
1211     transfer_t *item = (transfer_t *)vitem;
1212     void *exit_status = exit_standard;
1213     int waitsecs = 0;
1214     const char *connection_type = (tls_available ? "TCP, TLS" : "TCP");
1215     socklen_t addrlen;
1216     char buf[SKIPADDR_STRLEN];
1217 
1218     item->thread_exists = 1;
1219 
1220     DEBUG_PRINT1("client_connection() thread started");
1221 
1222     while (!shuttingdown) {
1223         size_t i;
1224         int rv;
1225         skm_channel_t channel;
1226 
1227         if (waitsecs != 0) {
1228             int waitcount = waitsecs;
1229 
1230             DEBUG_PRINT2("Failure in connection, "
1231                          "waiting %d seconds", waitcount);
1232             while (waitcount-- && !shuttingdown) {
1233                 sleep(1);
1234             }
1235             if (shuttingdown) {
1236                 break;
1237             }
1238         }
1239 
1240         INFOMSG("Attempting to connect to %s (%s)...",
1241                 item->ident, connection_type);
1242 
1243         for (rv = -1, i = 0;
1244              rv != 0 && i < skSockaddrArrayGetSize(item->addr); i++)
1245         {
1246             sk_sockaddr_t *addr = skSockaddrArrayGet(item->addr, i);
1247             switch (addr->sa.sa_family) {
1248               case AF_INET:
1249                 addrlen = sizeof(addr->v4);
1250                 break;
1251               case AF_INET6:
1252                 addrlen = sizeof(addr->v6);
1253                 break;
1254               default:
1255                 continue;
1256             }
1257             skSockaddrString(buf, sizeof(buf), addr);
1258             DEBUGMSG("Address for %s is %s", item->ident, buf);
1259             rv = skMsgQueueConnect(control, &addr->sa, addrlen, &channel);
1260         }
1261 
1262         if (rv != 0) {
1263             INFOMSG("Attempt to connect to %s failed (%s)",
1264                     item->ident, connection_type);
1265             if (waitsecs < 60) {
1266                 waitsecs += 5;
1267             }
1268         } else {
1269             conn_info_t *info;
1270             char conn_type[RWTRANSFER_CONNECTION_TYPE_SIZE_MAX];
1271             uint16_t port = 0;
1272 
1273             getConnectionInformation(control, channel,
1274                                      conn_type, sizeof(conn_type));
1275             skMsgGetLocalPort(control, channel, &port);
1276             DEBUGMSG("Connected (expecting ident %s) (local port %u, %s)",
1277                      item->ident, port, conn_type);
1278             info = (conn_info_t *)calloc(1, sizeof(*info));
1279             if (info == NULL) {
1280                 CRITMSG("Memory allocation failure");
1281                 threadExit(EXIT_FAILURE, exit_failure);
1282             }
1283             info->tls = tls_available;
1284             info->trnsfr = item;
1285             rv = skMsgChannelSplit(control, channel, &info->queue);
1286             if (rv != 0) {
1287                 if (shuttingdown) {
1288                     break;
1289                 }
1290                 CRITMSG("Failed to split channel");
1291                 threadExit(EXIT_FAILURE, exit_failure);
1292             }
1293 
1294             info->channel = channel;
1295             exit_status = handleConnection(info);
1296             if (exit_status != exit_failure) {
1297                 waitsecs = 0;
1298             } else if (waitsecs < 60) {
1299                 waitsecs += 5;
1300             }
1301         }
1302     }
1303 
1304     DEBUG_PRINT1("client_connection() thread ended");
1305 
1306     return exit_status;
1307 }
1308 
1309 
1310 /*
1311  *    THREAD ENTRY POINT
1312  *
1313  *    Entry point for the "client_main" thread, started from
1314  *    startTransferDaemon()
1315  */
1316 static void *
clientMain(void * dummy)1317 clientMain(
1318     void               *dummy)
1319 {
1320     RBLIST *list;
1321     transfer_t *item;
1322     int rv;
1323 
1324     SK_UNUSED_PARAM(dummy);
1325 
1326     control_thread_valid = 1;
1327 
1328     DEBUG_PRINT1("client_main() thread started");
1329 
1330     list = rbopenlist(transfers);
1331     if (list == NULL) {
1332         skAppPrintErr("Memory allocation failure stating client thread");
1333         threadExit(EXIT_FAILURE, NULL);
1334     }
1335 
1336     /* Start client threads */
1337     while ((item = (transfer_t *)rbreadlist(list)) != NULL) {
1338         rv = skthread_create("connection", &item->thread,
1339                              startClientConnection, item);
1340         if (rv != 0) {
1341             CRITMSG("Failed to create connection thread: %s", strerror(rv));
1342             threadExit(EXIT_FAILURE, NULL);
1343         }
1344     }
1345     rbcloselist(list);
1346 
1347     /* Start control loop */
1348     while (!shuttingdown) {
1349         sk_msg_t *msg;
1350         skm_channel_t channel;
1351 
1352         rv = skMsgQueueGetMessageFromChannel(control, SKMSG_CHANNEL_CONTROL,
1353                                              &msg);
1354         if (rv == -1) {
1355             assert(shuttingdown);
1356             continue;
1357         }
1358 
1359         switch (skMsgType(msg)) {
1360 
1361           case SKMSG_CTL_NEW_CONNECTION:
1362             /* This can't happen, as we aren't bound */
1363             ASSERT_ABORT(0);
1364             break;
1365 
1366           case SKMSG_CTL_CHANNEL_DIED:
1367             DEBUG_PRINT1("Received SKMSG_CTL_CHANNEL_DIED");
1368             channel = SKMSG_CTL_MSG_GET_CHANNEL(msg);
1369             list = rbopenlist(transfers);
1370             CHECK_ALLOC(list);
1371             while ((item = (transfer_t *)rbreadlist(list)) != NULL) {
1372                 if (item->channel_exists && channel == item->channel) {
1373                     INFOMSG("Channel to %s died", item->ident);
1374                     item->disconnect = 1;
1375                     rv = transferUnblock(item);
1376                     if (rv != 0) {
1377                         threadExit(EXIT_FAILURE, NULL);
1378                     }
1379                     break;
1380                 }
1381             }
1382             rbcloselist(list);
1383 
1384             sendString(control, channel, INTERNAL, CONN_DISCONNECT_RETRY,
1385                        LOG_INFO, "Remote side of channel died");
1386             break;
1387 
1388           default:
1389             WARNINGMSG("Received unknown control message %d", skMsgType(msg));
1390         }
1391 
1392         skMsgDestroy(msg);
1393     }
1394 
1395     DEBUG_PRINT1("client_main() thread ended");
1396     return NULL;
1397 }
1398 
1399 
1400 int
startTransferDaemon(void)1401 startTransferDaemon(
1402     void)
1403 {
1404     int rv;
1405 
1406     /* Initialize the message queue */
1407     rv = skMsgQueueCreate(&control);
1408     if (rv != 0) {
1409         skAppPrintErr("Failed to initialize message queue");
1410         exit(EXIT_FAILURE);
1411     }
1412 
1413     switch (mode) {
1414       case CLIENT:
1415         rv = skthread_create("client_main", &control_thread,
1416                              clientMain, NULL);
1417         if (rv != 0) {
1418             CRITMSG("Failed to create primary client thread: %s",strerror(rv));
1419             return -1;
1420         }
1421         break;
1422       case SERVER:
1423         rv = skthread_create("server_main", &control_thread,
1424                              serverMain, NULL);
1425         if (rv != 0) {
1426             CRITMSG("Failed to create primary server thread: %s",strerror(rv));
1427             return -1;
1428         }
1429         break;
1430       default:
1431         ASSERT_ABORT(0);
1432     }
1433 
1434     return 0;
1435 }
1436 
1437 
1438 int
checkMsg(sk_msg_t * msg,sk_msg_queue_t * q,connection_msg_t type)1439 checkMsg(
1440     sk_msg_t           *msg,
1441     sk_msg_queue_t     *q,
1442     connection_msg_t    type)
1443 {
1444     skm_type_t t;
1445     skm_len_t  len;
1446 
1447     assert(msg);
1448     assert(q);
1449     assert(type < CONN_NUMBER_OF_CONNECTION_MESSAGES);
1450 
1451     t = skMsgType(msg);
1452 
1453     if (t != type) {
1454         sendString(q, skMsgChannel(msg), EXTERNAL,
1455                    CONN_DISCONNECT, LOG_WARNING,
1456                    "Protocol error: expected %s, got %s (0x%04x)",
1457                    conn_msg_data[type].name,
1458                    ((t >= CONN_NUMBER_OF_CONNECTION_MESSAGES)
1459                     ? "<unknown>"
1460                     : conn_msg_data[t].name),
1461                    t);
1462         return 1;
1463     }
1464 
1465     if (conn_msg_data[type].size == -1) {
1466         return 0;
1467     }
1468 
1469     len = skMsgLength(msg);
1470     if (len != conn_msg_data[type].size) {
1471         sendString(q, skMsgChannel(msg), EXTERNAL,
1472                    CONN_DISCONNECT, LOG_WARNING,
1473                    "Protocol error: type %s, expected len %" PRId32 ", got %d",
1474                    conn_msg_data[type].name,
1475                    conn_msg_data[type].size, len);
1476         return 1;
1477     }
1478 
1479     return 0;
1480 }
1481 
1482 
1483 #undef sendString
1484 int
sendString(sk_msg_queue_t * q,skm_channel_t channel,int internal,skm_type_t type,int loglevel,const char * fmt,...)1485 sendString(
1486     sk_msg_queue_t     *q,
1487     skm_channel_t       channel,
1488     int                 internal,
1489     skm_type_t          type,
1490     int                 loglevel,
1491     const char         *fmt,
1492     ...)
1493 {
1494     int rv;
1495     va_list args;
1496     char msg[MAX_ERROR_MESSAGE];
1497     int len;
1498 
1499     va_start(args, fmt);
1500     len = vsnprintf(msg, sizeof(msg), fmt, args);
1501     if (len >= (int)sizeof(msg)) {
1502         len = sizeof(msg) - 1;
1503         msg[len] = '\0';
1504     }
1505 
1506     if (internal) {
1507         rv = skMsgQueueInjectMessage(q, channel, type, msg, len + 1);
1508     } else {
1509         rv = skMsgQueueSendMessage(q, channel, type, msg, len + 1);
1510     }
1511 
1512     if (!internal) {
1513         sklog(loglevel, "Sending \"%s\"", msg);
1514     }
1515     return rv;
1516 }
1517 
1518 
1519 void
threadExit(int status,void * retval)1520 threadExit(
1521     int                 status,
1522     void               *retval)
1523 {
1524     DEBUG_PRINT1("threadExit called");
1525     main_retval = status;
1526     pthread_kill(main_thread, SIGQUIT);
1527     pthread_exit(retval);
1528 }
1529 
1530 
1531 /*
1532 ** Local Variables:
1533 ** mode:c
1534 ** indent-tabs-mode:nil
1535 ** c-basic-offset:4
1536 ** End:
1537 */
1538