1 /*
2    Bacula(R) - The Network Backup Solution
3 
4    Copyright (C) 2000-2020 Kern Sibbald
5 
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8 
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13 
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16 
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula Core Sock Class definition
21  *
22  * Kern Sibbald, May MM
23  *
24  * Major refactoring of BSOCK code written by:
25  *
26  * Radoslaw Korzeniewski, MMXVIII
27  * radoslaw@korzeniewski.net, radekk@inteos.pl
28  * Inteos Sp. z o.o. http://www.inteos.pl/
29  *
30  * This is a common class for socket network communication derived from
31  * BSOCK class. It acts as a base class for non-Bacula network communication
32  * and as a base class for standard BSOCK implementation. Basically the BSOCK
33  * class did not changed its functionality for any Bacula specific part.
34  * Now you can use a BSOCKCLASS for other network communication.
35  */
36 
37 #include "bacula.h"
38 #include "jcr.h"
39 #include <netdb.h>
40 #include <netinet/tcp.h>
41 
42 #define BSOCKCORE_DEBUG_LVL    900
43 
44 #if !defined(ENODATA)              /* not defined on BSD systems */
45 #define ENODATA  EPIPE
46 #endif
47 
48 #if !defined(SOL_TCP)              /* Not defined on some systems */
49 #define SOL_TCP  IPPROTO_TCP
50 #endif
51 
52 #ifdef HAVE_WIN32
53 #include <mswsock.h>
54 static void win_close_wait(int fd);
55 #ifndef SOCK_CLOEXEC
56 #define SOCK_CLOEXEC 0
57 #endif
58 #endif
59 
60 /*
61  * make a nice dump of a message
62  */
dump_bsock_msg(int sock,uint32_t msgno,const char * what,uint32_t rc,int32_t pktsize,uint32_t flags,POOLMEM * amsg,int32_t amsglen)63 void BSOCKCORE::dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags,
64                                POOLMEM *amsg, int32_t amsglen)
65 {
66    char buf[54];
67    bool is_ascii;
68    int dbglvl = DT_NETWORK;
69 
70    if (amsglen<0) {
71       Dmsg5(dbglvl, "0x%p: %s %d:%d SIGNAL=%s\n", this, what, sock, msgno, bnet_sig_to_ascii(amsglen));
72    } else if (flags & BNET_IS_CMD) {
73       /* this is a command */
74       int32_t command;
75       int32_t capacity;
76       int64_t counter;
77       int size;
78       char *hash;
79       char *block;
80       int hash_size;
81       unser_declare;
82       unser_begin(amsg, amsglen);
83       unser_int32(command);
84       switch (command) {
85       case BNET_CMD_GET_HASH:
86       case BNET_CMD_ACK_HASH:
87       case BNET_CMD_UNK_HASH:
88          unser_assign(hash, sizeof(int)); /* I use only 4 bytes of the hash */
89          unser_end(amsg, amsglen);
90          Dmsg6(dbglvl, "%s %d:%d %s len=%ld #%08x\n", what, sock, msgno, bnet_cmd_to_name(command), amsglen, hash2int(hash));
91          break;
92       case BNET_CMD_STO_BLOCK:
93          /* unfortunately we don't know the hash size and don't know the offset
94           * of the data, we'll bet this is the default size and check to not
95           * access mem after the end
96           */
97          hash_size=bhash_info(DEDUP_DEFAULT_HASH_ID, NULL);
98          unser_assign(hash, hash_size); /* I only display the 4 first byte of the hash */
99          size=amsglen-(BNET_CMD_SIZE+hash_size);
100          if (size>0) {
101             unser_assign(block, size);
102             unser_end(amsg, amsglen);
103             smartdump(block, size, buf, sizeof(buf)-9, &is_ascii);
104          } else {
105             buf[0] = '\0';
106             is_ascii = false;
107          }
108          if (is_ascii) {
109             Dmsg7(dbglvl, "%s %d:%d %s size=%d #%08x \"%s\"\n", what, sock, msgno, bnet_cmd_to_name(command), size, hash2int(hash), buf);
110          } else {
111             Dmsg7(dbglvl, "%s %d:%d %s size=%d #%08x %s\n", what, sock, msgno, bnet_cmd_to_name(command), size, hash2int(hash), buf);
112          }
113          break;
114       case BNET_CMD_REC_ACK:
115          unser_int32(capacity);
116          unser_int64(counter);
117          unser_end(amsg, amsglen);
118          Dmsg6(dbglvl, "%s %d:%d %s cnt=%lld cap=%ld\n", what, sock, msgno, bnet_cmd_to_name(command), counter, capacity);
119          break;
120       case BNET_CMD_NONE:
121       case BNET_CMD_STP_THREAD:
122       default:
123          Dmsg5(dbglvl, "%s %d:%d %s len=%ld\n", what, sock, msgno, bnet_cmd_to_name(command), amsglen);
124          break;
125       }
126    } else {
127       // data
128       smartdump(amsg, amsglen, buf, sizeof(buf)-9, &is_ascii);
129       if (is_ascii) {
130          Dmsg6(dbglvl, "0x%p: %s %d:%d len=%d \"%s\"\n", this, what, sock, msgno, amsglen, buf);
131       } else {
132          Dmsg6(dbglvl, "0x%p: %s %d:%d len=%d %s\n", this, what, sock, msgno, amsglen, buf);
133       }
134    }
135 }
136 
BSOCKCallback()137 BSOCKCallback::BSOCKCallback()
138 {
139 }
140 
~BSOCKCallback()141 BSOCKCallback::~BSOCKCallback()
142 {
143 }
144 
145 /*
146  * Default constructor does class initialization.
147  */
BSOCKCORE()148 BSOCKCORE::BSOCKCORE() :
149    msg(NULL),
150    errmsg(NULL),
151    res(NULL),
152    tls(NULL),
153    src_addr(NULL),
154    read_seqno(0),
155    in_msg_no(0),
156    out_msg_no(0),
157    pout_msg_no(NULL),
158    msglen(0),
159    timer_start(0),
160    timeout(0),
161    m_fd(-1),
162    b_errno(0),
163    m_blocking(0),
164    errors(0),
165    m_suppress_error_msgs(false),
166    send_hook_cb(NULL),
167    m_next(NULL),
168    m_jcr(NULL),
169    pm_rmutex(NULL),
170    pm_wmutex(NULL),
171    m_who(NULL),
172    m_host(NULL),
173    m_port(0),
174    m_tid(NULL),
175    m_flags(0),
176    m_timed_out(false),
177    m_terminated(false),
178    m_closed(false),
179    m_duped(false),
180    m_use_locking(false),
181    m_bwlimit(0),
182    m_bandwidth(0),
183    m_nb_bytes(0),
184    m_last_tick(0),
185    m_rtt(0)
186 {
187    pthread_mutex_init(&m_rmutex, NULL);
188    pthread_mutex_init(&m_wmutex, NULL);
189    pthread_mutex_init(&m_mmutex, NULL);
190    bmemzero(&peer_addr, sizeof(peer_addr));
191    bmemzero(&client_addr, sizeof(client_addr));
192    init();
193 };
194 
195 /*
196  * Default destructor releases resources.
197  */
~BSOCKCORE()198 BSOCKCORE::~BSOCKCORE()
199 {
200    Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::~BSOCKCORE()\n");
201    _destroy();
202 };
203 
204 /*
205  * Initialization method.
206  */
init()207 void BSOCKCORE::init()
208 {
209    m_master = this;
210    set_closed();
211    set_terminated();
212    m_blocking = 1;
213    msg = get_pool_memory(PM_BSOCK);
214    errmsg = get_pool_memory(PM_MESSAGE);
215    timeout = BSOCKCORE_TIMEOUT;
216    pout_msg_no = &out_msg_no;
217 }
218 
free_tls()219 void BSOCKCORE::free_tls()
220 {
221    free_tls_connection(this->tls);
222    this->tls = NULL;
223 }
224 
225 /*
226  * Try to connect to host for max_retry_time at retry_time intervals.
227  *   Note, you must have called the constructor prior to calling
228  *   this routine.
229  */
connect(JCR * jcr,int retry_interval,utime_t max_retry_time,utime_t heart_beat,const char * name,char * host,char * service,int port,int verbose)230 bool BSOCKCORE::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
231                     utime_t heart_beat,
232                     const char *name, char *host, char *service, int port,
233                     int verbose)
234 {
235    bool ok = false;
236    int i;
237    int fatal = 0;
238    time_t begin_time = time(NULL);
239    time_t now;
240    btimer_t *tid = NULL;
241 
242    /* Try to trap out of OS call when time expires */
243    if (max_retry_time) {
244       tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
245    }
246 
247    for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
248         i -= retry_interval) {
249       berrno be;
250       if (fatal || (jcr && job_canceled(jcr))) {
251          goto bail_out;
252       }
253       Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n",
254             name, host, port, be.bstrerror());
255       if (i < 0) {
256          i = 60 * 5;               /* complain again in 5 minutes */
257          if (verbose)
258             Qmsg4(jcr, M_WARNING, 0, _(
259                "Could not connect to %s on %s:%d. ERR=%s\n"
260                "Retrying ...\n"), name, host, port, be.bstrerror());
261       }
262       bmicrosleep(retry_interval, 0);
263       now = time(NULL);
264       if (begin_time + max_retry_time <= now) {
265          Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
266                name, host, port, be.bstrerror());
267          goto bail_out;
268       }
269    }
270    ok = true;
271 
272 bail_out:
273    if (tid) {
274       stop_thread_timer(tid);
275    }
276    return ok;
277 }
278 
279 /*
280  * Finish initialization of the packet structure.
281  */
fin_init(JCR * jcr,int sockfd,const char * who,const char * host,int port,struct sockaddr * lclient_addr)282 void BSOCKCORE::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
283                struct sockaddr *lclient_addr)
284 {
285    Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
286    m_fd = sockfd;
287    if (m_who) {
288       free(m_who);
289    }
290    if (m_host) {
291       free(m_host);
292    }
293    set_who(bstrdup(who));
294    set_host(bstrdup(host));
295    set_port(port);
296    memcpy(&client_addr, lclient_addr, sizeof(client_addr));
297    set_jcr(jcr);
298 }
299 
300 /*
301  * Copy the address from the configuration dlist that gets passed in
302  */
set_source_address(dlist * src_addr_list)303 void BSOCKCORE::set_source_address(dlist *src_addr_list)
304 {
305    IPADDR *addr = NULL;
306 
307    // delete the object we already have, if it's allocated
308    if (src_addr) {
309      /* TODO: Why free() instead of delete as src_addr is a IPADDR class */
310      free( src_addr);
311      src_addr = NULL;
312    }
313 
314    if (src_addr_list) {
315      addr = (IPADDR*) src_addr_list->first();
316      src_addr = New( IPADDR(*addr));
317    }
318 }
319 
320 /*
321  * Open a TCP connection to the server
322  *    Returns true when connection was successful or false otherwise.
323  */
open(JCR * jcr,const char * name,char * host,char * service,int port,utime_t heart_beat,int * fatal)324 bool BSOCKCORE::open(JCR *jcr, const char *name, char *host, char *service,
325                int port, utime_t heart_beat, int *fatal)
326 {
327    int sockfd = -1;
328    dlist *addr_list;
329    IPADDR *ipaddr;
330    bool connected = false;
331    int turnon = 1;
332    const char *errstr;
333    int save_errno = 0;
334 
335    /*
336     * Fill in the structure serv_addr with the address of
337     * the server that we want to connect with.
338     */
339    if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
340       /* Note errstr is not malloc'ed */
341       Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
342             host, errstr);
343       Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
344             host, errstr);
345       *fatal = 1;
346       return false;
347    }
348 
349    remove_duplicate_addresses(addr_list);
350    foreach_dlist(ipaddr, addr_list) {
351       ipaddr->set_port_net(htons(port));
352       char allbuf[256 * 10];
353       char curbuf[256];
354       Dmsg2(100, "Current %sAll %s\n",
355                    ipaddr->build_address_str(curbuf, sizeof(curbuf)),
356                    build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
357       /* Open a TCP socket */
358       if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
359          berrno be;
360          save_errno = errno;
361          switch (errno) {
362 #ifdef EAFNOSUPPORT
363          case EAFNOSUPPORT:
364             /*
365              * The name lookup of the host returned an address in a protocol family
366              * we don't support. Suppress the error and try the next address.
367              */
368             break;
369 #endif
370 #ifdef EPROTONOSUPPORT
371          /* See above comments */
372          case EPROTONOSUPPORT:
373             break;
374 #endif
375 #ifdef EPROTOTYPE
376          /* See above comments */
377          case EPROTOTYPE:
378             break;
379 #endif
380          default:
381             *fatal = 1;
382             Qmsg3(jcr, M_ERROR, 0,  _("Socket open error. proto=%d port=%d. ERR=%s\n"),
383                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
384             Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
385                ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
386             break;
387          }
388          continue;
389       }
390 
391       /* Bind to the source address if it is set */
392       if (src_addr) {
393          /* It is not required to use SO_REUSEADDR here as DirSourceAddress &
394           * FdSourceAddress don't force the port of the source, only the address
395           */
396          if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
397             berrno be;
398             save_errno = errno;
399             *fatal = 1;
400             Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"),
401                   src_addr->get_family(), be.bstrerror() );
402             Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
403                   src_addr->get_family(), be.bstrerror() );
404             if (sockfd >= 0) socketClose(sockfd);
405             continue;
406          }
407       }
408 
409       /*
410        * Keep socket from timing out from inactivity
411        */
412       if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
413          berrno be;
414          Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
415                be.bstrerror());
416       }
417 #if defined(TCP_KEEPIDLE)
418       if (heart_beat) {
419          int opt = heart_beat;
420          if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
421             berrno be;
422             Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
423                   be.bstrerror());
424          }
425       }
426 #endif
427 
428       /* connect to server */
429       if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
430          save_errno = errno;
431          if (sockfd >= 0) socketClose(sockfd);
432          continue;
433       }
434       *fatal = 0;
435       connected = true;
436       break;
437    }
438 
439    if (!connected) {
440       berrno be;
441       free_addresses(addr_list);
442       errno = save_errno | b_errno_win32;
443       Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n",
444             name, host, port, be.bstrerror());
445       return false;
446    }
447    /*
448     * Keep socket from timing out from inactivity
449     *   Do this a second time out of paranoia
450     */
451    if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
452       berrno be;
453       Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
454             be.bstrerror());
455    }
456    fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
457    free_addresses(addr_list);
458 
459    /* Clean the packet a bit */
460    m_closed = false;
461    m_duped = false;
462    // Moved to BSOCK m_spool = false;
463    m_use_locking = false;
464    m_timed_out = false;
465    m_terminated = false;
466    m_suppress_error_msgs = false;
467    errors = 0;
468    m_blocking = 0;
469 
470 #ifdef INET6_ADDRSTRLEN
471    char info[2*INET6_ADDRSTRLEN+20];
472    Dmsg4(50, "OK connected to server  %s %s:%d. socket=%s\n",
473          name, host, port, get_info(info, sizeof(info)));
474 #endif
475    return true;
476 }
477 
478 /*
479  * Force read/write to use locking
480  */
set_locking()481 bool BSOCKCORE::set_locking()
482 {
483    int stat;
484    if (m_use_locking) {
485       return true;                      /* already set */
486    }
487    pm_rmutex = &m_rmutex;
488    pm_wmutex = &m_wmutex;
489    if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
490       berrno be;
491       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore read mutex. ERR=%s\n"),
492          be.bstrerror(stat));
493       return false;
494    }
495    if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
496       berrno be;
497       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore write mutex. ERR=%s\n"),
498          be.bstrerror(stat));
499       return false;
500    }
501    if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
502       berrno be;
503       Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore attribute mutex. ERR=%s\n"),
504          be.bstrerror(stat));
505       return false;
506    }
507    m_use_locking = true;
508    return true;
509 }
510 
clear_locking()511 void BSOCKCORE::clear_locking()
512 {
513    if (!m_use_locking || m_duped) {
514       return;
515    }
516    m_use_locking = false;
517    pthread_mutex_destroy(pm_rmutex);
518    pthread_mutex_destroy(pm_wmutex);
519    pthread_mutex_destroy(&m_mmutex);
520    pm_rmutex = NULL;
521    pm_wmutex = NULL;
522    return;
523 }
524 
525 /*
526  * Send a message over the network. Everything is sent in one write request.
527  *
528  * Returns: false on failure
529  *          true  on success
530  */
send()531 bool BSOCKCORE::send()
532 {
533    int32_t rc;
534    bool ok = true;
535    bool locked = false;
536 
537    if (is_closed()) {
538       if (!m_suppress_error_msgs) {
539          Qmsg0(m_jcr, M_ERROR, 0,  _("Socket is closed\n"));
540       }
541       return false;
542    }
543    if (errors) {
544       if (!m_suppress_error_msgs) {
545          Qmsg4(m_jcr, M_ERROR, 0,  _("Socket has errors=%d on call to %s:%s:%d\n"),
546              errors, m_who, m_host, m_port);
547       }
548       return false;
549    }
550    if (is_terminated()) {
551       if (!m_suppress_error_msgs) {
552          Qmsg4(m_jcr, M_ERROR, 0,  _("BSOCKCORE send while terminated=%d on call to %s:%s:%d\n"),
553              is_terminated(), m_who, m_host, m_port);
554       }
555       return false;
556    }
557 
558    if (msglen > 4000000) {
559       if (!m_suppress_error_msgs) {
560          Qmsg4(m_jcr, M_ERROR, 0,
561             _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
562              msglen, m_who, m_host, m_port);
563       }
564       return false;
565    }
566 
567    if (send_hook_cb) {
568       if (!send_hook_cb->bsock_send_cb()) {
569          Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port);
570          Qmsg3(m_jcr, M_ERROR, 0, _("Flowcontrol failure on %s:%s:%d\n"), m_who, m_host, m_port);
571          return false;
572       }
573    }
574    if (m_use_locking) {
575       pP(pm_wmutex);
576       locked = true;
577    }
578 
579    (*pout_msg_no)++;        /* increment message number */
580 
581    /* send data packet */
582    timer_start = watchdog_time;  /* start timer */
583    clear_timed_out();
584    /* Full I/O done in one write */
585    rc = write_nbytes(msg, msglen);
586    if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, msg, msglen);
587    timer_start = 0;         /* clear timer */
588    if (rc != msglen) {
589       errors++;
590       if (errno == 0) {
591          b_errno = EIO;
592       } else {
593          b_errno = errno;
594       }
595       if (rc < 0) {
596          if (!m_suppress_error_msgs) {
597             Qmsg5(m_jcr, M_ERROR, 0,
598                   _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
599                   msglen, m_who,
600                   m_host, m_port, this->bstrerror());
601          }
602       } else {
603          Qmsg5(m_jcr, M_ERROR, 0,
604                _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
605                msglen, m_who, m_host, m_port, rc);
606       }
607       ok = false;
608    }
609 //   Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0,
610 //      msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags);
611    if (locked) pV(pm_wmutex);
612    return ok;
613 }
614 
615 /*
616  * Format and send a message
617  *  Returns: false on error
618  *           true  on success
619  */
fsend(const char * fmt,...)620 bool BSOCKCORE::fsend(const char *fmt, ...)
621 {
622    va_list arg_ptr;
623    int maxlen;
624 
625    if (is_null(this)) {
626       return false;                /* do not seg fault */
627    }
628    if (errors || is_terminated() || is_closed()) {
629       return false;
630    }
631    /* This probably won't work, but we vsnprintf, then if we
632     * get a negative length or a length greater than our buffer
633     * (depending on which library is used), the printf was truncated, so
634     * get a bigger buffer and try again.
635     */
636    for (;;) {
637       maxlen = sizeof_pool_memory(msg) - 1;
638       va_start(arg_ptr, fmt);
639       msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
640       va_end(arg_ptr);
641       if (msglen >= 0 && msglen < (maxlen - 5)) {
642          break;
643       }
644       msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
645    }
646    return send();
647 }
648 
649 /*
650  * Receive a data from the other end.
651  * The number of expected bytes in len.
652  * Returns number of bytes read (may return zero), the msglen is set accordingly.
653  * Returns -1 on error so msglen will be zero.
654  */
recvn(int len)655 int32_t BSOCKCORE::recvn(int len)
656 {
657    /* The method has to be redesigned from scratch */
658    int32_t nbytes;
659    bool locked = false;
660 
661    msglen = nbytes = 0;
662    msg[msglen] = 0;
663    if (errors || is_terminated() || is_closed()) {
664       /* error, cannot receive */
665       return -1;
666    }
667 
668    if (len > 0) {
669       /* do read only when len > 0 */
670       if (m_use_locking) {
671          pP(pm_rmutex);
672          locked = true;
673       }
674       read_seqno++;                 /* bump sequence number */
675       timer_start = watchdog_time;  /* set start wait time */
676       clear_timed_out();
677       /* Make sure the buffer is big enough + one byte for EOS */
678       if (len >= (int32_t) sizeof_pool_memory(msg)) {
679          msg = realloc_pool_memory(msg, len + 100);
680       }
681       timer_start = watchdog_time;  /* set start wait time */
682       clear_timed_out();
683       if ((nbytes = read_nbytes(msg, len)) <= 0) {
684          timer_start = 0;      /* clear timer */
685          /* probably pipe broken because client died */
686          if (errno == 0) {
687             b_errno = ENODATA;
688          } else {
689             b_errno = errno;
690          }
691          nbytes = -1;
692          errors++;
693          msglen = 0;        /* assume hard EOF received */
694          Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
695                m_who, m_host, m_port, this->bstrerror());
696          goto bailout;
697       }
698       timer_start = 0;         /* clear timer */
699       in_msg_no++;
700       msglen = nbytes;
701       /*
702        * always add a zero by to properly terminate any
703        * string that was send to us. Note, we ensured above that the
704        * buffer is at least one byte longer than the message length.
705        */
706       msg[nbytes] = 0; /* terminate in case it is a string */
707       /*
708        * The following uses *lots* of resources so turn it on only for
709        * serious debugging.
710        */
711       Dsm_check(300);
712    }
713 
714 bailout:
715    if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "GRECV", nbytes, len, m_flags, msg, msglen);
716 
717    if (locked) pV(pm_rmutex);
718    return nbytes;                  /* return actual length of message or -1 */
719 }
720 
721 /*
722  * Return the string for the error that occurred
723  * on the socket. Only the first error is retained.
724  */
bstrerror()725 const char *BSOCKCORE::bstrerror()
726 {
727    berrno be;
728    if (errmsg == NULL) {
729       errmsg = get_pool_memory(PM_MESSAGE);
730    }
731    if (b_errno == 0) {
732       pm_strcpy(errmsg, "I/O Error");
733    } else {
734       pm_strcpy(errmsg, be.bstrerror(b_errno));
735    }
736    return errmsg;
737 }
738 
get_peer(char * buf,socklen_t buflen)739 int BSOCKCORE::get_peer(char *buf, socklen_t buflen)
740 {
741 #if defined(HAVE_INET_NTOP)
742     if (peer_addr.sin_family == 0) {
743         socklen_t salen = sizeof(peer_addr);
744         int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
745         if (rval < 0) return rval;
746     }
747     if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
748         return -1;
749 
750     return 0;
751 #else
752     return -1;
753 #endif
754 }
755 
756 /* return the address and port of the source and the destination of a socket
757  * in the format S.S.S.S.port_S:D.D.D.D.port_D
758  */
get_info(char * buf,int buflen)759 char* BSOCKCORE::get_info(char *buf, int buflen)
760 {
761 #ifndef HAVE_WIN32
762 #ifdef  INET6_ADDRSTRLEN
763    socklen_t len;
764    struct sockaddr_storage addr;
765    char ipstr_s[INET6_ADDRSTRLEN], ipstr_d[INET6_ADDRSTRLEN];
766    int port_s, port_d;
767 
768    len = sizeof addr;
769    if (getsockname(m_fd, (struct sockaddr*)&addr, &len) != 0) {
770       goto error;
771    }
772 
773    // source address, deal with both IPv4 and IPv6:
774    if (addr.ss_family == AF_INET) {
775       struct sockaddr_in *s = (struct sockaddr_in *)&addr;
776       port_s = ntohs(s->sin_port);
777       inet_ntop(AF_INET, &s->sin_addr, ipstr_s, sizeof ipstr_s);
778    } else { // AF_INET6
779       struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
780       port_s = ntohs(s->sin6_port);
781       inet_ntop(AF_INET6, &s->sin6_addr, ipstr_s, sizeof ipstr_s);
782    }
783 
784    len = sizeof addr;
785    if (getpeername(m_fd, (struct sockaddr*)&addr, &len) != 0) {
786       goto error;
787    }
788 
789    // destination address, deal with both IPv4 and IPv6:
790    if (addr.ss_family == AF_INET) {
791       struct sockaddr_in *s = (struct sockaddr_in *)&addr;
792       port_d = ntohs(s->sin_port);
793       inet_ntop(AF_INET, &s->sin_addr, ipstr_d, sizeof ipstr_s);
794    } else { // AF_INET6
795       struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
796       port_d = ntohs(s->sin6_port);
797       inet_ntop(AF_INET6, &s->sin6_addr, ipstr_d, sizeof ipstr_s);
798    }
799 
800    bsnprintf(buf, buflen, "%s.%d:%s.%d s=0x%p", ipstr_s, port_s, ipstr_d, port_d, this);
801    return buf;
802 error:
803 #endif  /* INET6 */
804 #endif  /* !WIN32 */
805    buf[0]='\0';
806    return buf;
807 }
808 /*
809  * Set the network buffer size, suggested size is in size.
810  *  Actual size obtained is returned in bs->msglen
811  *
812  *  Returns: false on failure
813  *           true  on success
814  */
set_buffer_size(uint32_t size,int rw)815 bool BSOCKCORE::set_buffer_size(uint32_t size, int rw)
816 {
817    uint32_t dbuf_size, start_size;
818 
819 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
820    int opt;
821    opt = IPTOS_THROUGHPUT;
822    setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
823 #endif
824 
825    if (size != 0) {
826       dbuf_size = size;
827    } else {
828       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
829    }
830    start_size = dbuf_size;
831    /* The extra 512 can hold data such as Sparse/Offset pointer */
832    if ((msg = realloc_pool_memory(msg, dbuf_size + 512)) == NULL) {
833       Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCKCORE data buffer\n"));
834       return false;
835    }
836 
837    /*
838     * If user has not set the size, use the OS default -- i.e. do not
839     *   try to set it.  This allows sys admins to set the size they
840     *   want in the OS, and Bacula will comply. See bug #1493
841     */
842    if (size == 0) {
843       msglen = dbuf_size;
844       return true;
845    }
846 
847    if (rw & BNET_SETBUF_READ) {
848       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
849               SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
850          berrno be;
851          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
852          dbuf_size -= TAPE_BSIZE;
853       }
854       Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
855       if (dbuf_size != start_size) {
856          Qmsg1(get_jcr(), M_WARNING, 0,
857                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
858       }
859    }
860    if (size != 0) {
861       dbuf_size = size;
862    } else {
863       dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
864    }
865    start_size = dbuf_size;
866    if (rw & BNET_SETBUF_WRITE) {
867       while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
868               SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
869          berrno be;
870          Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
871          dbuf_size -= TAPE_BSIZE;
872       }
873       Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
874       if (dbuf_size != start_size) {
875          Qmsg1(get_jcr(), M_WARNING, 0,
876                _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
877       }
878    }
879 
880    msglen = dbuf_size;
881    return true;
882 }
883 
884 /*
885  * Set socket non-blocking
886  * Returns previous socket flag
887  */
set_nonblocking()888 int BSOCKCORE::set_nonblocking()
889 {
890    int oflags;
891 
892    /* Get current flags */
893    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
894       berrno be;
895       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
896    }
897 
898    /* Set O_NONBLOCK flag */
899    if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
900       berrno be;
901       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
902    }
903 
904    m_blocking = 0;
905    return oflags;
906 }
907 
908 /*
909  * Set socket blocking
910  * Returns previous socket flags
911  */
set_blocking()912 int BSOCKCORE::set_blocking()
913 {
914    int oflags;
915    /* Get current flags */
916    if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
917       berrno be;
918       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
919    }
920 
921    /* Set O_NONBLOCK flag */
922    if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
923       berrno be;
924       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
925    }
926 
927    m_blocking = 1;
928    return oflags;
929 }
930 
set_killable(bool killable)931 void BSOCKCORE::set_killable(bool killable)
932 {
933    if (m_jcr) {
934       m_jcr->set_killable(killable);
935    }
936 }
937 
938 /*
939  * Restores socket flags
940  */
restore_blocking(int flags)941 void BSOCKCORE::restore_blocking (int flags)
942 {
943    if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
944       berrno be;
945       Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
946    }
947 
948    m_blocking = (flags & O_NONBLOCK) ? true : false;
949 }
950 
951 /*
952  * Wait for a specified time for data to appear on
953  * the BSOCKCORE connection.
954  *
955  *   Returns: 1 if data available
956  *            0 if timeout
957  *           -1 if error
958  */
wait_data(int sec,int msec)959 int BSOCKCORE::wait_data(int sec, int msec)
960 {
961    for (;;) {
962       switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
963       case 0:                      /* timeout */
964          b_errno = 0;
965          return 0;
966       case -1:
967          b_errno = errno;
968          if (errno == EINTR) {
969             continue;
970          }
971          return -1;                /* error return */
972       default:
973          b_errno = 0;
974 #ifdef HAVE_TLS
975          if (this->tls && !tls_bsock_probe(this)) {
976             continue; /* false alarm, maybe a session key negotiation in progress on the socket */
977          }
978 #endif
979          return 1;
980       }
981    }
982 }
983 
984 /*
985  * As above, but returns on interrupt
986  */
wait_data_intr(int sec,int msec)987 int BSOCKCORE::wait_data_intr(int sec, int msec)
988 {
989    switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
990    case 0:                      /* timeout */
991       b_errno = 0;
992       return 0;
993    case -1:
994       b_errno = errno;
995       return -1;                /* error return */
996    default:
997       b_errno = 0;
998 #ifdef HAVE_TLS
999       if (this->tls && !tls_bsock_probe(this)) {
1000          /* maybe a session key negotiation waked up the socket */
1001          return 0;
1002       }
1003 #endif
1004       break;
1005    }
1006    return 1;
1007 }
1008 
1009 /*
1010  *  This routine closes the current BSOCKCORE.
1011  *   It does not delete the socket packet
1012  *   resources, which are released in bsock->destroy().
1013  */
1014 #ifndef SHUT_RDWR
1015 #define SHUT_RDWR 2
1016 #endif
1017 
1018 /*
1019  * The JCR is canceled, set terminate for chained BSOCKCOREs starting from master
1020  */
cancel()1021 void BSOCKCORE::cancel()
1022 {
1023    master_lock();
1024    for (BSOCKCORE *next = m_master; next != NULL; next = next->m_next) {
1025       if (!next->m_closed) {
1026          next->m_terminated = true;
1027          next->m_timed_out = true;
1028       }
1029    }
1030    master_unlock();
1031 }
1032 
1033 /*
1034  * Note, this routine closes the socket, but leaves the
1035  *   bsockcore memory in place.
1036  *   every thread is responsible of closing and destroying its own duped or not
1037  *   duped BSOCKCORE
1038  */
close()1039 void BSOCKCORE::close()
1040 {
1041    BSOCKCORE *bsock = this;
1042 
1043    Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::close()\n");
1044    if (bsock->is_closed()) {
1045       return;
1046    }
1047    if (!m_duped) {
1048       clear_locking();
1049    }
1050    bsock->set_closed();
1051    bsock->set_terminated();
1052    if (!bsock->m_duped) {
1053       /* Shutdown tls cleanly. */
1054       if (bsock->tls) {
1055          tls_bsock_shutdown(bsock);
1056          free_tls_connection(bsock->tls);
1057          bsock->tls = NULL;
1058       }
1059 #ifdef HAVE_WIN32
1060       if (!bsock->is_timed_out()) {
1061          win_close_wait(bsock->m_fd);  /* Ensure that data is not discarded */
1062       }
1063 #else
1064       if (bsock->is_timed_out()) {
1065          shutdown(bsock->m_fd, SHUT_RDWR);   /* discard any pending I/O */
1066       }
1067 #endif
1068       /* On Windows this discards data if we did not do a close_wait() */
1069       socketClose(bsock->m_fd);      /* normal close */
1070    }
1071    return;
1072 }
1073 
1074 /*
1075  * Destroy the socket (i.e. release all resources)
1076  */
_destroy()1077 void BSOCKCORE::_destroy()
1078 {
1079    Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::_destroy()\n");
1080    this->close();                  /* Ensure that socket is closed */
1081    if (msg) {
1082       free_pool_memory(msg);
1083       msg = NULL;
1084    } else {
1085       ASSERT2(1 == 0, "Two calls to destroy socket");  /* double destroy */
1086    }
1087    if (errmsg) {
1088       free_pool_memory(errmsg);
1089       errmsg = NULL;
1090    }
1091    if (m_who) {
1092       free(m_who);
1093       m_who = NULL;
1094    }
1095    if (m_host) {
1096       free(m_host);
1097       m_host = NULL;
1098    }
1099    if (src_addr) {
1100       free(src_addr);
1101       src_addr = NULL;
1102    }
1103 }
1104 
1105 /*
1106  * Destroy the socket (i.e. release all resources)
1107  * including duped sockets.
1108  * should not be called from duped BSOCKCORE
1109  */
destroy()1110 void BSOCKCORE::destroy()
1111 {
1112    Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy()\n");
1113    ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCKCORE::destroy() already called\n")
1114    ASSERTD(this == m_master, "BSOCKCORE::destroy() called by a non master BSOCKCORE\n")
1115    ASSERTD(!m_duped, "BSOCKCORE::destroy() called by a duped BSOCKCORE\n")
1116    /* I'm the master I must destroy() all the duped BSOCKCOREs */
1117    master_lock();
1118    BSOCKCORE *ahead;
1119    for (BSOCKCORE *next = m_next; next != NULL; next = ahead) {
1120       ahead = next->m_next;
1121       Dmsg1(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy():delete(%p)\n", next);
1122       delete(next);
1123    }
1124    master_unlock();
1125    Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy():delete(this)\n");
1126    delete(this);
1127 }
1128 
1129 /* Try to limit the bandwidth of a network connection
1130  */
control_bwlimit(int bytes)1131 void BSOCKCORE::control_bwlimit(int bytes)
1132 {
1133    btime_t now, temp;
1134    if (bytes == 0) {
1135       return;
1136    }
1137 
1138    now = get_current_btime();          /* microseconds */
1139    temp = now - m_last_tick;           /* microseconds */
1140 
1141    m_nb_bytes += bytes;
1142 
1143    if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
1144       m_nb_bytes = bytes;
1145       m_last_tick = now;
1146       return;
1147    }
1148 
1149    /* Less than 0.1ms since the last call, see the next time */
1150    if (temp < 100) {
1151       return;
1152    }
1153 
1154    /* Remove what was authorised to be written in temp us */
1155    m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0));
1156 
1157    if (m_nb_bytes < 0) {
1158       m_nb_bytes = 0;
1159    }
1160 
1161    /* What exceed should be converted in sleep time */
1162    int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
1163    if (usec_sleep > 100) {
1164       bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
1165       m_last_tick = get_current_btime();
1166       m_nb_bytes = 0;
1167    } else {
1168       m_last_tick = now;
1169    }
1170 }
1171 
1172 /*
1173  * Write nbytes to the network.
1174  * It may require several writes.
1175  */
1176 
write_nbytes(char * ptr,int32_t nbytes)1177 int32_t BSOCKCORE::write_nbytes(char *ptr, int32_t nbytes)
1178 {
1179    int32_t nleft, nwritten;
1180 
1181 #ifdef HAVE_TLS
1182    if (tls) {
1183       /* TLS enabled */
1184       return (tls_bsock_writen((BSOCK*)this, ptr, nbytes));
1185    }
1186 #endif /* HAVE_TLS */
1187 
1188    nleft = nbytes;
1189    while (nleft > 0) {
1190       do {
1191          errno = 0;
1192          nwritten = socketWrite(m_fd, ptr, nleft);
1193          if (is_timed_out() || is_terminated()) {
1194             return -1;
1195          }
1196 
1197 #ifdef HAVE_WIN32
1198          /*
1199           * We simulate errno on Windows for a socket
1200           *  error in order to handle errors correctly.
1201           */
1202          if (nwritten == SOCKET_ERROR) {
1203             DWORD err = WSAGetLastError();
1204             nwritten = -1;
1205             if (err == WSAEINTR) {
1206                errno = EINTR;
1207             } else if (err == WSAEWOULDBLOCK) {
1208                errno = EAGAIN;
1209             } else {
1210                errno = EIO;        /* some other error */
1211             }
1212          }
1213 #endif
1214 
1215       } while (nwritten == -1 && errno == EINTR);
1216       /*
1217        * If connection is non-blocking, we will get EAGAIN, so
1218        * use select()/poll to keep from consuming all the CPU
1219        * and try again.
1220        */
1221       if (nwritten == -1 && errno == EAGAIN) {
1222          fd_wait_data(m_fd, WAIT_WRITE, 1, 0);
1223          continue;
1224       }
1225       if (nwritten <= 0) {
1226          return -1;                /* error */
1227       }
1228       nleft -= nwritten;
1229       ptr += nwritten;
1230       if (use_bwlimit()) {
1231          control_bwlimit(nwritten);
1232       }
1233    }
1234    return nbytes - nleft;
1235 }
1236 
1237 /*
1238  * Read a nbytes from the network.
1239  * It is possible that the total bytes require in several
1240  * read requests
1241  */
1242 
read_nbytes(char * ptr,int32_t nbytes)1243 int32_t BSOCKCORE::read_nbytes(char *ptr, int32_t nbytes)
1244 {
1245    int32_t nleft, nread;
1246 
1247 #ifdef HAVE_TLS
1248    if (tls) {
1249       /* TLS enabled */
1250       return (tls_bsock_readn((BSOCK*)this, ptr, nbytes));
1251    }
1252 #endif /* HAVE_TLS */
1253 
1254    nleft = nbytes;
1255    while (nleft > 0) {
1256       errno = 0;
1257       nread = socketRead(m_fd, ptr, nleft);
1258       if (is_timed_out() || is_terminated()) {
1259          return -1;
1260       }
1261 
1262 #ifdef HAVE_WIN32
1263       /*
1264        * We simulate errno on Windows for a socket
1265        *  error in order to handle errors correctly.
1266        */
1267       if (nread == SOCKET_ERROR) {
1268         DWORD err = WSAGetLastError();
1269         nread = -1;
1270         if (err == WSAEINTR) {
1271            errno = EINTR;
1272         } else if (err == WSAEWOULDBLOCK) {
1273            errno = EAGAIN;
1274         } else {
1275            errno = EIO;            /* some other error */
1276         }
1277      }
1278 #endif
1279 
1280       if (nread == -1) {
1281          if (errno == EINTR) {
1282             continue;
1283          }
1284          if (errno == EAGAIN) {
1285             bmicrosleep(0, 20000);  /* try again in 20ms */
1286             continue;
1287          }
1288       }
1289       if (nread <= 0) {
1290          return -1;                /* error, or EOF */
1291       }
1292       nleft -= nread;
1293       ptr += nread;
1294       if (use_bwlimit()) {
1295          control_bwlimit(nread);
1296       }
1297    }
1298    return nbytes - nleft;          /* return >= 0 */
1299 }
1300 
1301 #ifdef HAVE_WIN32
1302 /*
1303  * closesocket is supposed to do a graceful disconnect under Window
1304  *   but it doesn't. Comments on http://msdn.microsoft.com/en-us/li
1305  *   confirm this behaviour. DisconnectEx is required instead, but
1306  *   that function needs to be retrieved via WS IOCTL
1307  */
1308 static void
win_close_wait(int fd)1309 win_close_wait(int fd)
1310 {
1311    int ret;
1312    GUID disconnectex_guid = WSAID_DISCONNECTEX;
1313    DWORD bytes_returned;
1314    LPFN_DISCONNECTEX DisconnectEx;
1315    ret = WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &disconnectex_guid, sizeof(disconnectex_guid), &DisconnectEx, sizeof(DisconnectEx), &bytes_returned, NULL, NULL);
1316    Dmsg1(100, "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER, WSAID_DISCONNECTEX) ret = %d\n", ret);
1317    if (!ret) {
1318       DisconnectEx(fd, NULL, 0, 0);
1319    }
1320 }
1321 #endif
1322 
1323 #ifndef TEST_PROGRAM
1324 #define TEST_PROGRAM_A
1325 #endif
1326 
dump()1327 void BSOCKCORE::dump()
1328 {
1329 #ifdef TEST_PROGRAM
1330    char ed1[50];
1331    Pmsg1(-1, "BSOCKCORE::dump(): %p\n", this);
1332    Pmsg1(-1, "\tmsg: %p\n", msg);
1333    Pmsg1(-1, "\terrmsg: %p\n", errmsg);
1334    Pmsg1(-1, "\tres: %p\n", res);
1335    Pmsg1(-1, "\ttls: %p\n", tls);
1336    Pmsg1(-1, "\tsrc_addr: %p\n", src_addr);
1337    Pmsg1(-1, "\tread_seqno: %s\n", edit_uint64(read_seqno, ed1));
1338    Pmsg1(-1, "\tin_msg_no: %s\n", edit_uint64(in_msg_no, ed1));
1339    Pmsg1(-1, "\tout_msg_no: %s\n", edit_uint64(out_msg_no, ed1));
1340    Pmsg1(-1, "\tpout_msg_no: %p\n", pout_msg_no);
1341    Pmsg1(-1, "\tmsglen: %s\n", edit_int64(msglen, ed1));
1342    Pmsg1(-1, "\ttimer_start: %ld\n", timer_start);
1343    Pmsg1(-1, "\ttimeout: %ld\n", timeout);
1344    Pmsg1(-1, "\tm_fd: %d\n", m_fd);
1345    Pmsg1(-1, "\tb_errno: %d\n", b_errno);
1346    Pmsg1(-1, "\tm_blocking: %d\n", m_blocking);
1347    Pmsg1(-1, "\terrors: %d\n", errors);
1348    Pmsg1(-1, "\tm_suppress_error_msgs: %s\n", m_suppress_error_msgs?"true":"false");
1349 //   Pmsg1(0, "\tclient_addr:{ } struct sockaddr client_addr;       /* client's IP address */
1350 //   Pmsg1(0, "\tstruct sockaddr_in peer_addr;      /* peer's IP address */
1351    Pmsg1(-1, "\tsend_hook_cb: %p\n", send_hook_cb);
1352    Pmsg1(-1, "\tm_master: %p\n", m_master);
1353    Pmsg1(-1, "\tm_next: %p\n", m_next);
1354    Pmsg1(-1, "\tm_jcr: %p\n", m_jcr);
1355 //   pthread_mutex_t m_rmutex;          /* for read locking if use_locking set */
1356 //   pthread_mutex_t m_wmutex;          /* for write locking if use_locking set */
1357 //   mutable pthread_mutex_t m_mmutex;  /* when accessing the master/next chain */
1358 //   pthread_mutex_t *pm_rmutex;        /* Pointer to the read mutex */
1359 //   pthread_mutex_t *pm_wmutex;        /* Pointer to the write mutex */
1360    Pmsg1(-1, "\tm_who: %p\n", m_who);
1361    Pmsg1(-1, "\tm_host: %p\n", m_host);
1362    Pmsg1(-1, "\tm_port: %d\n", m_port);
1363    Pmsg1(-1, "\tm_tid: %p\n", m_tid);
1364    Pmsg1(-1, "\tm_flags: %s\n", edit_uint64(m_flags, ed1));
1365    Pmsg1(-1, "\tm_timed_out: %s\n", m_timed_out?"true":"false");
1366    Pmsg1(-1, "\tm_terminated: %s\n", m_terminated?"true":"false");
1367    Pmsg1(-1, "\tm_closed: %s\n", m_closed?"true":"false");
1368    Pmsg1(-1, "\tm_duped: %s\n", m_duped?"true":"false");
1369    Pmsg1(-1, "\tm_use_locking: %s\n", m_use_locking?"true":"false");
1370    Pmsg1(-1, "\tm_bwlimit: %s\n", edit_int64(m_bwlimit, ed1));
1371    Pmsg1(-1, "\tm_nb_bytes: %s\n", edit_int64(m_nb_bytes, ed1));
1372    Pmsg1(-1, "\tm_last_tick: %s\n", edit_int64(m_last_tick, ed1));
1373 #endif
1374 };
1375 
1376 
1377 #ifdef TEST_PROGRAM
1378 #include "unittests.h"
1379 
free_my_jcr(JCR * jcr)1380 void free_my_jcr(JCR *jcr){
1381    /* TODO: handle full JCR free */
1382    free_jcr(jcr);
1383 };
1384 
1385 #define  ofnamefmt      "/tmp/bsockcore.%d.test"
1386 const char *data =      "This is a BSOCKCORE communication test: 1234567\n";
1387 const char *hexdata =   "< 00000000 54 68 69 73 20 69 73 20 61 20 42 53 4f 43 4b 43 # This is a BSOCKC\n" \
1388                         "< 00000010 4f 52 45 20 63 6f 6d 6d 75 6e 69 63 61 74 69 6f # ORE communicatio\n" \
1389                         "< 00000020 6e 20 74 65 73 74 3a 20 31 32 33 34 35 36 37 0a # n test: 1234567.\n";
1390 
main()1391 int main()
1392 {
1393    Unittests bsockcore_test("bsockcore_test", true);
1394    BSOCKCORE *bs;
1395    pid_t pid;
1396    int rc;
1397    char *host = (char*)"localhost";
1398    char *name = (char*)"Test";
1399    JCR *jcr;
1400    bool btest;
1401    char buf[256];       // extend this buffer when hexdata becomes longer
1402    int fd;
1403 
1404    Pmsg0(0, "Initialize tests ...\n");
1405 
1406    jcr = new_jcr(sizeof(JCR), NULL);
1407    bs = New(BSOCKCORE);
1408    bs->set_jcr(jcr);
1409    ok(bs != NULL && bs->jcr() == jcr,
1410          "Default initialization");
1411 
1412    Pmsg0(0, "Preparing fork\n");
1413    pid = fork();
1414    if (0 == pid){
1415       Pmsg0(0, "Prepare to execute netcat\n");
1416       pid_t mypid = getpid();
1417       char ofname[30];
1418       snprintf(ofname, sizeof(ofname), ofnamefmt, mypid);
1419       rc = execl("/bin/netcat", "netcat", "-v", "-p", "20000", "-l", "-o", ofname, NULL);
1420       Pmsg1(0, "Error executing netcat: %s\n", strerror(rc));
1421       exit(1);
1422    }
1423    Pmsg1(0, "After fork: %d\n", pid);
1424    bmicrosleep(2, 0);      // we wait a bit to netcat to start
1425    btest = bs->connect(jcr, 1, 10, 0, name, host, NULL, 20000, 0);
1426    ok(btest, "BSOCKCORE connection test");
1427    if (btest){
1428       /* we are connected, so send some data */
1429       bs->fsend("%s", data);
1430       bmicrosleep(2, 0);      // wait until data received by netcat
1431       bs->close();
1432       ok(bs->is_closed(), "Close bsockcore");
1433       /* now check what netcat received */
1434       char ofname[30];
1435       snprintf(ofname, sizeof(ofname), ofnamefmt, pid);
1436       fd = open(ofname, O_RDONLY);
1437       btest = false;
1438       if (fd > 0){
1439          btest = true;
1440          read(fd, buf, strlen(hexdata));
1441          close(fd);
1442          unlink(ofname);
1443       }
1444       ok(btest, "Output file available");
1445       ok(strcmp(buf, hexdata) == 0, "Communication data");
1446    }
1447    kill(pid, SIGTERM);
1448    delete(bs);
1449    free_my_jcr(jcr);
1450    term_last_jobs_list();
1451    return report();
1452 };
1453 #endif /* TEST_PROGRAM */
1454