1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Bacula Core Sock Class definition
21 *
22 * Kern Sibbald, May MM
23 *
24 * Major refactoring of BSOCK code written by:
25 *
26 * Radoslaw Korzeniewski, MMXVIII
27 * radoslaw@korzeniewski.net, radekk@inteos.pl
28 * Inteos Sp. z o.o. http://www.inteos.pl/
29 *
30 * This is a common class for socket network communication derived from
31 * BSOCK class. It acts as a base class for non-Bacula network communication
32 * and as a base class for standard BSOCK implementation. Basically the BSOCK
33 * class did not changed its functionality for any Bacula specific part.
34 * Now you can use a BSOCKCLASS for other network communication.
35 */
36
37 #include "bacula.h"
38 #include "jcr.h"
39 #include <netdb.h>
40 #include <netinet/tcp.h>
41
42 #define BSOCKCORE_DEBUG_LVL 900
43
44 #if !defined(ENODATA) /* not defined on BSD systems */
45 #define ENODATA EPIPE
46 #endif
47
48 #if !defined(SOL_TCP) /* Not defined on some systems */
49 #define SOL_TCP IPPROTO_TCP
50 #endif
51
52 #ifdef HAVE_WIN32
53 #include <mswsock.h>
54 static void win_close_wait(int fd);
55 #ifndef SOCK_CLOEXEC
56 #define SOCK_CLOEXEC 0
57 #endif
58 #endif
59
60 /*
61 * make a nice dump of a message
62 */
dump_bsock_msg(int sock,uint32_t msgno,const char * what,uint32_t rc,int32_t pktsize,uint32_t flags,POOLMEM * amsg,int32_t amsglen)63 void BSOCKCORE::dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags,
64 POOLMEM *amsg, int32_t amsglen)
65 {
66 char buf[54];
67 bool is_ascii;
68 int dbglvl = DT_NETWORK;
69
70 if (amsglen<0) {
71 Dmsg5(dbglvl, "0x%p: %s %d:%d SIGNAL=%s\n", this, what, sock, msgno, bnet_sig_to_ascii(amsglen));
72 } else if (flags & BNET_IS_CMD) {
73 /* this is a command */
74 int32_t command;
75 int32_t capacity;
76 int64_t counter;
77 int size;
78 char *hash;
79 char *block;
80 int hash_size;
81 unser_declare;
82 unser_begin(amsg, amsglen);
83 unser_int32(command);
84 switch (command) {
85 case BNET_CMD_GET_HASH:
86 case BNET_CMD_ACK_HASH:
87 case BNET_CMD_UNK_HASH:
88 unser_assign(hash, sizeof(int)); /* I use only 4 bytes of the hash */
89 unser_end(amsg, amsglen);
90 Dmsg6(dbglvl, "%s %d:%d %s len=%ld #%08x\n", what, sock, msgno, bnet_cmd_to_name(command), amsglen, hash2int(hash));
91 break;
92 case BNET_CMD_STO_BLOCK:
93 /* unfortunately we don't know the hash size and don't know the offset
94 * of the data, we'll bet this is the default size and check to not
95 * access mem after the end
96 */
97 hash_size=bhash_info(DEDUP_DEFAULT_HASH_ID, NULL);
98 unser_assign(hash, hash_size); /* I only display the 4 first byte of the hash */
99 size=amsglen-(BNET_CMD_SIZE+hash_size);
100 if (size>0) {
101 unser_assign(block, size);
102 unser_end(amsg, amsglen);
103 smartdump(block, size, buf, sizeof(buf)-9, &is_ascii);
104 } else {
105 buf[0] = '\0';
106 is_ascii = false;
107 }
108 if (is_ascii) {
109 Dmsg7(dbglvl, "%s %d:%d %s size=%d #%08x \"%s\"\n", what, sock, msgno, bnet_cmd_to_name(command), size, hash2int(hash), buf);
110 } else {
111 Dmsg7(dbglvl, "%s %d:%d %s size=%d #%08x %s\n", what, sock, msgno, bnet_cmd_to_name(command), size, hash2int(hash), buf);
112 }
113 break;
114 case BNET_CMD_REC_ACK:
115 unser_int32(capacity);
116 unser_int64(counter);
117 unser_end(amsg, amsglen);
118 Dmsg6(dbglvl, "%s %d:%d %s cnt=%lld cap=%ld\n", what, sock, msgno, bnet_cmd_to_name(command), counter, capacity);
119 break;
120 case BNET_CMD_NONE:
121 case BNET_CMD_STP_THREAD:
122 default:
123 Dmsg5(dbglvl, "%s %d:%d %s len=%ld\n", what, sock, msgno, bnet_cmd_to_name(command), amsglen);
124 break;
125 }
126 } else {
127 // data
128 smartdump(amsg, amsglen, buf, sizeof(buf)-9, &is_ascii);
129 if (is_ascii) {
130 Dmsg6(dbglvl, "0x%p: %s %d:%d len=%d \"%s\"\n", this, what, sock, msgno, amsglen, buf);
131 } else {
132 Dmsg6(dbglvl, "0x%p: %s %d:%d len=%d %s\n", this, what, sock, msgno, amsglen, buf);
133 }
134 }
135 }
136
BSOCKCallback()137 BSOCKCallback::BSOCKCallback()
138 {
139 }
140
~BSOCKCallback()141 BSOCKCallback::~BSOCKCallback()
142 {
143 }
144
145 /*
146 * Default constructor does class initialization.
147 */
BSOCKCORE()148 BSOCKCORE::BSOCKCORE() :
149 msg(NULL),
150 errmsg(NULL),
151 res(NULL),
152 tls(NULL),
153 src_addr(NULL),
154 read_seqno(0),
155 in_msg_no(0),
156 out_msg_no(0),
157 pout_msg_no(NULL),
158 msglen(0),
159 timer_start(0),
160 timeout(0),
161 m_fd(-1),
162 b_errno(0),
163 m_blocking(0),
164 errors(0),
165 m_suppress_error_msgs(false),
166 send_hook_cb(NULL),
167 m_next(NULL),
168 m_jcr(NULL),
169 pm_rmutex(NULL),
170 pm_wmutex(NULL),
171 m_who(NULL),
172 m_host(NULL),
173 m_port(0),
174 m_tid(NULL),
175 m_flags(0),
176 m_timed_out(false),
177 m_terminated(false),
178 m_closed(false),
179 m_duped(false),
180 m_use_locking(false),
181 m_bwlimit(0),
182 m_bandwidth(0),
183 m_nb_bytes(0),
184 m_last_tick(0),
185 m_rtt(0)
186 {
187 pthread_mutex_init(&m_rmutex, NULL);
188 pthread_mutex_init(&m_wmutex, NULL);
189 pthread_mutex_init(&m_mmutex, NULL);
190 bmemzero(&peer_addr, sizeof(peer_addr));
191 bmemzero(&client_addr, sizeof(client_addr));
192 init();
193 };
194
195 /*
196 * Default destructor releases resources.
197 */
~BSOCKCORE()198 BSOCKCORE::~BSOCKCORE()
199 {
200 Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::~BSOCKCORE()\n");
201 _destroy();
202 };
203
204 /*
205 * Initialization method.
206 */
init()207 void BSOCKCORE::init()
208 {
209 m_master = this;
210 set_closed();
211 set_terminated();
212 m_blocking = 1;
213 msg = get_pool_memory(PM_BSOCK);
214 errmsg = get_pool_memory(PM_MESSAGE);
215 timeout = BSOCKCORE_TIMEOUT;
216 pout_msg_no = &out_msg_no;
217 }
218
free_tls()219 void BSOCKCORE::free_tls()
220 {
221 free_tls_connection(this->tls);
222 this->tls = NULL;
223 }
224
225 /*
226 * Try to connect to host for max_retry_time at retry_time intervals.
227 * Note, you must have called the constructor prior to calling
228 * this routine.
229 */
connect(JCR * jcr,int retry_interval,utime_t max_retry_time,utime_t heart_beat,const char * name,char * host,char * service,int port,int verbose)230 bool BSOCKCORE::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
231 utime_t heart_beat,
232 const char *name, char *host, char *service, int port,
233 int verbose)
234 {
235 bool ok = false;
236 int i;
237 int fatal = 0;
238 time_t begin_time = time(NULL);
239 time_t now;
240 btimer_t *tid = NULL;
241
242 /* Try to trap out of OS call when time expires */
243 if (max_retry_time) {
244 tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
245 }
246
247 for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
248 i -= retry_interval) {
249 berrno be;
250 if (fatal || (jcr && job_canceled(jcr))) {
251 goto bail_out;
252 }
253 Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n",
254 name, host, port, be.bstrerror());
255 if (i < 0) {
256 i = 60 * 5; /* complain again in 5 minutes */
257 if (verbose)
258 Qmsg4(jcr, M_WARNING, 0, _(
259 "Could not connect to %s on %s:%d. ERR=%s\n"
260 "Retrying ...\n"), name, host, port, be.bstrerror());
261 }
262 bmicrosleep(retry_interval, 0);
263 now = time(NULL);
264 if (begin_time + max_retry_time <= now) {
265 Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
266 name, host, port, be.bstrerror());
267 goto bail_out;
268 }
269 }
270 ok = true;
271
272 bail_out:
273 if (tid) {
274 stop_thread_timer(tid);
275 }
276 return ok;
277 }
278
279 /*
280 * Finish initialization of the packet structure.
281 */
fin_init(JCR * jcr,int sockfd,const char * who,const char * host,int port,struct sockaddr * lclient_addr)282 void BSOCKCORE::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
283 struct sockaddr *lclient_addr)
284 {
285 Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
286 m_fd = sockfd;
287 if (m_who) {
288 free(m_who);
289 }
290 if (m_host) {
291 free(m_host);
292 }
293 set_who(bstrdup(who));
294 set_host(bstrdup(host));
295 set_port(port);
296 memcpy(&client_addr, lclient_addr, sizeof(client_addr));
297 set_jcr(jcr);
298 }
299
300 /*
301 * Copy the address from the configuration dlist that gets passed in
302 */
set_source_address(dlist * src_addr_list)303 void BSOCKCORE::set_source_address(dlist *src_addr_list)
304 {
305 IPADDR *addr = NULL;
306
307 // delete the object we already have, if it's allocated
308 if (src_addr) {
309 /* TODO: Why free() instead of delete as src_addr is a IPADDR class */
310 free( src_addr);
311 src_addr = NULL;
312 }
313
314 if (src_addr_list) {
315 addr = (IPADDR*) src_addr_list->first();
316 src_addr = New( IPADDR(*addr));
317 }
318 }
319
320 /*
321 * Open a TCP connection to the server
322 * Returns true when connection was successful or false otherwise.
323 */
open(JCR * jcr,const char * name,char * host,char * service,int port,utime_t heart_beat,int * fatal)324 bool BSOCKCORE::open(JCR *jcr, const char *name, char *host, char *service,
325 int port, utime_t heart_beat, int *fatal)
326 {
327 int sockfd = -1;
328 dlist *addr_list;
329 IPADDR *ipaddr;
330 bool connected = false;
331 int turnon = 1;
332 const char *errstr;
333 int save_errno = 0;
334
335 /*
336 * Fill in the structure serv_addr with the address of
337 * the server that we want to connect with.
338 */
339 if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
340 /* Note errstr is not malloc'ed */
341 Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
342 host, errstr);
343 Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
344 host, errstr);
345 *fatal = 1;
346 return false;
347 }
348
349 remove_duplicate_addresses(addr_list);
350 foreach_dlist(ipaddr, addr_list) {
351 ipaddr->set_port_net(htons(port));
352 char allbuf[256 * 10];
353 char curbuf[256];
354 Dmsg2(100, "Current %sAll %s\n",
355 ipaddr->build_address_str(curbuf, sizeof(curbuf)),
356 build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
357 /* Open a TCP socket */
358 if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
359 berrno be;
360 save_errno = errno;
361 switch (errno) {
362 #ifdef EAFNOSUPPORT
363 case EAFNOSUPPORT:
364 /*
365 * The name lookup of the host returned an address in a protocol family
366 * we don't support. Suppress the error and try the next address.
367 */
368 break;
369 #endif
370 #ifdef EPROTONOSUPPORT
371 /* See above comments */
372 case EPROTONOSUPPORT:
373 break;
374 #endif
375 #ifdef EPROTOTYPE
376 /* See above comments */
377 case EPROTOTYPE:
378 break;
379 #endif
380 default:
381 *fatal = 1;
382 Qmsg3(jcr, M_ERROR, 0, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
383 ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
384 Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
385 ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
386 break;
387 }
388 continue;
389 }
390
391 /* Bind to the source address if it is set */
392 if (src_addr) {
393 /* It is not required to use SO_REUSEADDR here as DirSourceAddress &
394 * FdSourceAddress don't force the port of the source, only the address
395 */
396 if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
397 berrno be;
398 save_errno = errno;
399 *fatal = 1;
400 Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"),
401 src_addr->get_family(), be.bstrerror() );
402 Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
403 src_addr->get_family(), be.bstrerror() );
404 if (sockfd >= 0) socketClose(sockfd);
405 continue;
406 }
407 }
408
409 /*
410 * Keep socket from timing out from inactivity
411 */
412 if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
413 berrno be;
414 Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
415 be.bstrerror());
416 }
417 #if defined(TCP_KEEPIDLE)
418 if (heart_beat) {
419 int opt = heart_beat;
420 if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
421 berrno be;
422 Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
423 be.bstrerror());
424 }
425 }
426 #endif
427
428 /* connect to server */
429 if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
430 save_errno = errno;
431 if (sockfd >= 0) socketClose(sockfd);
432 continue;
433 }
434 *fatal = 0;
435 connected = true;
436 break;
437 }
438
439 if (!connected) {
440 berrno be;
441 free_addresses(addr_list);
442 errno = save_errno | b_errno_win32;
443 Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n",
444 name, host, port, be.bstrerror());
445 return false;
446 }
447 /*
448 * Keep socket from timing out from inactivity
449 * Do this a second time out of paranoia
450 */
451 if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
452 berrno be;
453 Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
454 be.bstrerror());
455 }
456 fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
457 free_addresses(addr_list);
458
459 /* Clean the packet a bit */
460 m_closed = false;
461 m_duped = false;
462 // Moved to BSOCK m_spool = false;
463 m_use_locking = false;
464 m_timed_out = false;
465 m_terminated = false;
466 m_suppress_error_msgs = false;
467 errors = 0;
468 m_blocking = 0;
469
470 #ifdef INET6_ADDRSTRLEN
471 char info[2*INET6_ADDRSTRLEN+20];
472 Dmsg4(50, "OK connected to server %s %s:%d. socket=%s\n",
473 name, host, port, get_info(info, sizeof(info)));
474 #endif
475 return true;
476 }
477
478 /*
479 * Force read/write to use locking
480 */
set_locking()481 bool BSOCKCORE::set_locking()
482 {
483 int stat;
484 if (m_use_locking) {
485 return true; /* already set */
486 }
487 pm_rmutex = &m_rmutex;
488 pm_wmutex = &m_wmutex;
489 if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
490 berrno be;
491 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore read mutex. ERR=%s\n"),
492 be.bstrerror(stat));
493 return false;
494 }
495 if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
496 berrno be;
497 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore write mutex. ERR=%s\n"),
498 be.bstrerror(stat));
499 return false;
500 }
501 if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
502 berrno be;
503 Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore attribute mutex. ERR=%s\n"),
504 be.bstrerror(stat));
505 return false;
506 }
507 m_use_locking = true;
508 return true;
509 }
510
clear_locking()511 void BSOCKCORE::clear_locking()
512 {
513 if (!m_use_locking || m_duped) {
514 return;
515 }
516 m_use_locking = false;
517 pthread_mutex_destroy(pm_rmutex);
518 pthread_mutex_destroy(pm_wmutex);
519 pthread_mutex_destroy(&m_mmutex);
520 pm_rmutex = NULL;
521 pm_wmutex = NULL;
522 return;
523 }
524
525 /*
526 * Send a message over the network. Everything is sent in one write request.
527 *
528 * Returns: false on failure
529 * true on success
530 */
send()531 bool BSOCKCORE::send()
532 {
533 int32_t rc;
534 bool ok = true;
535 bool locked = false;
536
537 if (is_closed()) {
538 if (!m_suppress_error_msgs) {
539 Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n"));
540 }
541 return false;
542 }
543 if (errors) {
544 if (!m_suppress_error_msgs) {
545 Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"),
546 errors, m_who, m_host, m_port);
547 }
548 return false;
549 }
550 if (is_terminated()) {
551 if (!m_suppress_error_msgs) {
552 Qmsg4(m_jcr, M_ERROR, 0, _("BSOCKCORE send while terminated=%d on call to %s:%s:%d\n"),
553 is_terminated(), m_who, m_host, m_port);
554 }
555 return false;
556 }
557
558 if (msglen > 4000000) {
559 if (!m_suppress_error_msgs) {
560 Qmsg4(m_jcr, M_ERROR, 0,
561 _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
562 msglen, m_who, m_host, m_port);
563 }
564 return false;
565 }
566
567 if (send_hook_cb) {
568 if (!send_hook_cb->bsock_send_cb()) {
569 Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port);
570 Qmsg3(m_jcr, M_ERROR, 0, _("Flowcontrol failure on %s:%s:%d\n"), m_who, m_host, m_port);
571 return false;
572 }
573 }
574 if (m_use_locking) {
575 pP(pm_wmutex);
576 locked = true;
577 }
578
579 (*pout_msg_no)++; /* increment message number */
580
581 /* send data packet */
582 timer_start = watchdog_time; /* start timer */
583 clear_timed_out();
584 /* Full I/O done in one write */
585 rc = write_nbytes(msg, msglen);
586 if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, msg, msglen);
587 timer_start = 0; /* clear timer */
588 if (rc != msglen) {
589 errors++;
590 if (errno == 0) {
591 b_errno = EIO;
592 } else {
593 b_errno = errno;
594 }
595 if (rc < 0) {
596 if (!m_suppress_error_msgs) {
597 Qmsg5(m_jcr, M_ERROR, 0,
598 _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
599 msglen, m_who,
600 m_host, m_port, this->bstrerror());
601 }
602 } else {
603 Qmsg5(m_jcr, M_ERROR, 0,
604 _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
605 msglen, m_who, m_host, m_port, rc);
606 }
607 ok = false;
608 }
609 // Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0,
610 // msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags);
611 if (locked) pV(pm_wmutex);
612 return ok;
613 }
614
615 /*
616 * Format and send a message
617 * Returns: false on error
618 * true on success
619 */
fsend(const char * fmt,...)620 bool BSOCKCORE::fsend(const char *fmt, ...)
621 {
622 va_list arg_ptr;
623 int maxlen;
624
625 if (is_null(this)) {
626 return false; /* do not seg fault */
627 }
628 if (errors || is_terminated() || is_closed()) {
629 return false;
630 }
631 /* This probably won't work, but we vsnprintf, then if we
632 * get a negative length or a length greater than our buffer
633 * (depending on which library is used), the printf was truncated, so
634 * get a bigger buffer and try again.
635 */
636 for (;;) {
637 maxlen = sizeof_pool_memory(msg) - 1;
638 va_start(arg_ptr, fmt);
639 msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
640 va_end(arg_ptr);
641 if (msglen >= 0 && msglen < (maxlen - 5)) {
642 break;
643 }
644 msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
645 }
646 return send();
647 }
648
649 /*
650 * Receive a data from the other end.
651 * The number of expected bytes in len.
652 * Returns number of bytes read (may return zero), the msglen is set accordingly.
653 * Returns -1 on error so msglen will be zero.
654 */
recvn(int len)655 int32_t BSOCKCORE::recvn(int len)
656 {
657 /* The method has to be redesigned from scratch */
658 int32_t nbytes;
659 bool locked = false;
660
661 msglen = nbytes = 0;
662 msg[msglen] = 0;
663 if (errors || is_terminated() || is_closed()) {
664 /* error, cannot receive */
665 return -1;
666 }
667
668 if (len > 0) {
669 /* do read only when len > 0 */
670 if (m_use_locking) {
671 pP(pm_rmutex);
672 locked = true;
673 }
674 read_seqno++; /* bump sequence number */
675 timer_start = watchdog_time; /* set start wait time */
676 clear_timed_out();
677 /* Make sure the buffer is big enough + one byte for EOS */
678 if (len >= (int32_t) sizeof_pool_memory(msg)) {
679 msg = realloc_pool_memory(msg, len + 100);
680 }
681 timer_start = watchdog_time; /* set start wait time */
682 clear_timed_out();
683 if ((nbytes = read_nbytes(msg, len)) <= 0) {
684 timer_start = 0; /* clear timer */
685 /* probably pipe broken because client died */
686 if (errno == 0) {
687 b_errno = ENODATA;
688 } else {
689 b_errno = errno;
690 }
691 nbytes = -1;
692 errors++;
693 msglen = 0; /* assume hard EOF received */
694 Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
695 m_who, m_host, m_port, this->bstrerror());
696 goto bailout;
697 }
698 timer_start = 0; /* clear timer */
699 in_msg_no++;
700 msglen = nbytes;
701 /*
702 * always add a zero by to properly terminate any
703 * string that was send to us. Note, we ensured above that the
704 * buffer is at least one byte longer than the message length.
705 */
706 msg[nbytes] = 0; /* terminate in case it is a string */
707 /*
708 * The following uses *lots* of resources so turn it on only for
709 * serious debugging.
710 */
711 Dsm_check(300);
712 }
713
714 bailout:
715 if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "GRECV", nbytes, len, m_flags, msg, msglen);
716
717 if (locked) pV(pm_rmutex);
718 return nbytes; /* return actual length of message or -1 */
719 }
720
721 /*
722 * Return the string for the error that occurred
723 * on the socket. Only the first error is retained.
724 */
bstrerror()725 const char *BSOCKCORE::bstrerror()
726 {
727 berrno be;
728 if (errmsg == NULL) {
729 errmsg = get_pool_memory(PM_MESSAGE);
730 }
731 if (b_errno == 0) {
732 pm_strcpy(errmsg, "I/O Error");
733 } else {
734 pm_strcpy(errmsg, be.bstrerror(b_errno));
735 }
736 return errmsg;
737 }
738
get_peer(char * buf,socklen_t buflen)739 int BSOCKCORE::get_peer(char *buf, socklen_t buflen)
740 {
741 #if defined(HAVE_INET_NTOP)
742 if (peer_addr.sin_family == 0) {
743 socklen_t salen = sizeof(peer_addr);
744 int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
745 if (rval < 0) return rval;
746 }
747 if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
748 return -1;
749
750 return 0;
751 #else
752 return -1;
753 #endif
754 }
755
756 /* return the address and port of the source and the destination of a socket
757 * in the format S.S.S.S.port_S:D.D.D.D.port_D
758 */
get_info(char * buf,int buflen)759 char* BSOCKCORE::get_info(char *buf, int buflen)
760 {
761 #ifndef HAVE_WIN32
762 #ifdef INET6_ADDRSTRLEN
763 socklen_t len;
764 struct sockaddr_storage addr;
765 char ipstr_s[INET6_ADDRSTRLEN], ipstr_d[INET6_ADDRSTRLEN];
766 int port_s, port_d;
767
768 len = sizeof addr;
769 if (getsockname(m_fd, (struct sockaddr*)&addr, &len) != 0) {
770 goto error;
771 }
772
773 // source address, deal with both IPv4 and IPv6:
774 if (addr.ss_family == AF_INET) {
775 struct sockaddr_in *s = (struct sockaddr_in *)&addr;
776 port_s = ntohs(s->sin_port);
777 inet_ntop(AF_INET, &s->sin_addr, ipstr_s, sizeof ipstr_s);
778 } else { // AF_INET6
779 struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
780 port_s = ntohs(s->sin6_port);
781 inet_ntop(AF_INET6, &s->sin6_addr, ipstr_s, sizeof ipstr_s);
782 }
783
784 len = sizeof addr;
785 if (getpeername(m_fd, (struct sockaddr*)&addr, &len) != 0) {
786 goto error;
787 }
788
789 // destination address, deal with both IPv4 and IPv6:
790 if (addr.ss_family == AF_INET) {
791 struct sockaddr_in *s = (struct sockaddr_in *)&addr;
792 port_d = ntohs(s->sin_port);
793 inet_ntop(AF_INET, &s->sin_addr, ipstr_d, sizeof ipstr_s);
794 } else { // AF_INET6
795 struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
796 port_d = ntohs(s->sin6_port);
797 inet_ntop(AF_INET6, &s->sin6_addr, ipstr_d, sizeof ipstr_s);
798 }
799
800 bsnprintf(buf, buflen, "%s.%d:%s.%d s=0x%p", ipstr_s, port_s, ipstr_d, port_d, this);
801 return buf;
802 error:
803 #endif /* INET6 */
804 #endif /* !WIN32 */
805 buf[0]='\0';
806 return buf;
807 }
808 /*
809 * Set the network buffer size, suggested size is in size.
810 * Actual size obtained is returned in bs->msglen
811 *
812 * Returns: false on failure
813 * true on success
814 */
set_buffer_size(uint32_t size,int rw)815 bool BSOCKCORE::set_buffer_size(uint32_t size, int rw)
816 {
817 uint32_t dbuf_size, start_size;
818
819 #if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
820 int opt;
821 opt = IPTOS_THROUGHPUT;
822 setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
823 #endif
824
825 if (size != 0) {
826 dbuf_size = size;
827 } else {
828 dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
829 }
830 start_size = dbuf_size;
831 /* The extra 512 can hold data such as Sparse/Offset pointer */
832 if ((msg = realloc_pool_memory(msg, dbuf_size + 512)) == NULL) {
833 Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCKCORE data buffer\n"));
834 return false;
835 }
836
837 /*
838 * If user has not set the size, use the OS default -- i.e. do not
839 * try to set it. This allows sys admins to set the size they
840 * want in the OS, and Bacula will comply. See bug #1493
841 */
842 if (size == 0) {
843 msglen = dbuf_size;
844 return true;
845 }
846
847 if (rw & BNET_SETBUF_READ) {
848 while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
849 SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
850 berrno be;
851 Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
852 dbuf_size -= TAPE_BSIZE;
853 }
854 Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
855 if (dbuf_size != start_size) {
856 Qmsg1(get_jcr(), M_WARNING, 0,
857 _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
858 }
859 }
860 if (size != 0) {
861 dbuf_size = size;
862 } else {
863 dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
864 }
865 start_size = dbuf_size;
866 if (rw & BNET_SETBUF_WRITE) {
867 while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
868 SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
869 berrno be;
870 Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
871 dbuf_size -= TAPE_BSIZE;
872 }
873 Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
874 if (dbuf_size != start_size) {
875 Qmsg1(get_jcr(), M_WARNING, 0,
876 _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
877 }
878 }
879
880 msglen = dbuf_size;
881 return true;
882 }
883
884 /*
885 * Set socket non-blocking
886 * Returns previous socket flag
887 */
set_nonblocking()888 int BSOCKCORE::set_nonblocking()
889 {
890 int oflags;
891
892 /* Get current flags */
893 if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
894 berrno be;
895 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
896 }
897
898 /* Set O_NONBLOCK flag */
899 if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
900 berrno be;
901 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
902 }
903
904 m_blocking = 0;
905 return oflags;
906 }
907
908 /*
909 * Set socket blocking
910 * Returns previous socket flags
911 */
set_blocking()912 int BSOCKCORE::set_blocking()
913 {
914 int oflags;
915 /* Get current flags */
916 if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
917 berrno be;
918 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
919 }
920
921 /* Set O_NONBLOCK flag */
922 if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
923 berrno be;
924 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
925 }
926
927 m_blocking = 1;
928 return oflags;
929 }
930
set_killable(bool killable)931 void BSOCKCORE::set_killable(bool killable)
932 {
933 if (m_jcr) {
934 m_jcr->set_killable(killable);
935 }
936 }
937
938 /*
939 * Restores socket flags
940 */
restore_blocking(int flags)941 void BSOCKCORE::restore_blocking (int flags)
942 {
943 if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
944 berrno be;
945 Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
946 }
947
948 m_blocking = (flags & O_NONBLOCK) ? true : false;
949 }
950
951 /*
952 * Wait for a specified time for data to appear on
953 * the BSOCKCORE connection.
954 *
955 * Returns: 1 if data available
956 * 0 if timeout
957 * -1 if error
958 */
wait_data(int sec,int msec)959 int BSOCKCORE::wait_data(int sec, int msec)
960 {
961 for (;;) {
962 switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
963 case 0: /* timeout */
964 b_errno = 0;
965 return 0;
966 case -1:
967 b_errno = errno;
968 if (errno == EINTR) {
969 continue;
970 }
971 return -1; /* error return */
972 default:
973 b_errno = 0;
974 #ifdef HAVE_TLS
975 if (this->tls && !tls_bsock_probe(this)) {
976 continue; /* false alarm, maybe a session key negotiation in progress on the socket */
977 }
978 #endif
979 return 1;
980 }
981 }
982 }
983
984 /*
985 * As above, but returns on interrupt
986 */
wait_data_intr(int sec,int msec)987 int BSOCKCORE::wait_data_intr(int sec, int msec)
988 {
989 switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
990 case 0: /* timeout */
991 b_errno = 0;
992 return 0;
993 case -1:
994 b_errno = errno;
995 return -1; /* error return */
996 default:
997 b_errno = 0;
998 #ifdef HAVE_TLS
999 if (this->tls && !tls_bsock_probe(this)) {
1000 /* maybe a session key negotiation waked up the socket */
1001 return 0;
1002 }
1003 #endif
1004 break;
1005 }
1006 return 1;
1007 }
1008
1009 /*
1010 * This routine closes the current BSOCKCORE.
1011 * It does not delete the socket packet
1012 * resources, which are released in bsock->destroy().
1013 */
1014 #ifndef SHUT_RDWR
1015 #define SHUT_RDWR 2
1016 #endif
1017
1018 /*
1019 * The JCR is canceled, set terminate for chained BSOCKCOREs starting from master
1020 */
cancel()1021 void BSOCKCORE::cancel()
1022 {
1023 master_lock();
1024 for (BSOCKCORE *next = m_master; next != NULL; next = next->m_next) {
1025 if (!next->m_closed) {
1026 next->m_terminated = true;
1027 next->m_timed_out = true;
1028 }
1029 }
1030 master_unlock();
1031 }
1032
1033 /*
1034 * Note, this routine closes the socket, but leaves the
1035 * bsockcore memory in place.
1036 * every thread is responsible of closing and destroying its own duped or not
1037 * duped BSOCKCORE
1038 */
close()1039 void BSOCKCORE::close()
1040 {
1041 BSOCKCORE *bsock = this;
1042
1043 Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::close()\n");
1044 if (bsock->is_closed()) {
1045 return;
1046 }
1047 if (!m_duped) {
1048 clear_locking();
1049 }
1050 bsock->set_closed();
1051 bsock->set_terminated();
1052 if (!bsock->m_duped) {
1053 /* Shutdown tls cleanly. */
1054 if (bsock->tls) {
1055 tls_bsock_shutdown(bsock);
1056 free_tls_connection(bsock->tls);
1057 bsock->tls = NULL;
1058 }
1059 #ifdef HAVE_WIN32
1060 if (!bsock->is_timed_out()) {
1061 win_close_wait(bsock->m_fd); /* Ensure that data is not discarded */
1062 }
1063 #else
1064 if (bsock->is_timed_out()) {
1065 shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */
1066 }
1067 #endif
1068 /* On Windows this discards data if we did not do a close_wait() */
1069 socketClose(bsock->m_fd); /* normal close */
1070 }
1071 return;
1072 }
1073
1074 /*
1075 * Destroy the socket (i.e. release all resources)
1076 */
_destroy()1077 void BSOCKCORE::_destroy()
1078 {
1079 Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::_destroy()\n");
1080 this->close(); /* Ensure that socket is closed */
1081 if (msg) {
1082 free_pool_memory(msg);
1083 msg = NULL;
1084 } else {
1085 ASSERT2(1 == 0, "Two calls to destroy socket"); /* double destroy */
1086 }
1087 if (errmsg) {
1088 free_pool_memory(errmsg);
1089 errmsg = NULL;
1090 }
1091 if (m_who) {
1092 free(m_who);
1093 m_who = NULL;
1094 }
1095 if (m_host) {
1096 free(m_host);
1097 m_host = NULL;
1098 }
1099 if (src_addr) {
1100 free(src_addr);
1101 src_addr = NULL;
1102 }
1103 }
1104
1105 /*
1106 * Destroy the socket (i.e. release all resources)
1107 * including duped sockets.
1108 * should not be called from duped BSOCKCORE
1109 */
destroy()1110 void BSOCKCORE::destroy()
1111 {
1112 Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy()\n");
1113 ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCKCORE::destroy() already called\n")
1114 ASSERTD(this == m_master, "BSOCKCORE::destroy() called by a non master BSOCKCORE\n")
1115 ASSERTD(!m_duped, "BSOCKCORE::destroy() called by a duped BSOCKCORE\n")
1116 /* I'm the master I must destroy() all the duped BSOCKCOREs */
1117 master_lock();
1118 BSOCKCORE *ahead;
1119 for (BSOCKCORE *next = m_next; next != NULL; next = ahead) {
1120 ahead = next->m_next;
1121 Dmsg1(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy():delete(%p)\n", next);
1122 delete(next);
1123 }
1124 master_unlock();
1125 Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy():delete(this)\n");
1126 delete(this);
1127 }
1128
1129 /* Try to limit the bandwidth of a network connection
1130 */
control_bwlimit(int bytes)1131 void BSOCKCORE::control_bwlimit(int bytes)
1132 {
1133 btime_t now, temp;
1134 if (bytes == 0) {
1135 return;
1136 }
1137
1138 now = get_current_btime(); /* microseconds */
1139 temp = now - m_last_tick; /* microseconds */
1140
1141 m_nb_bytes += bytes;
1142
1143 if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
1144 m_nb_bytes = bytes;
1145 m_last_tick = now;
1146 return;
1147 }
1148
1149 /* Less than 0.1ms since the last call, see the next time */
1150 if (temp < 100) {
1151 return;
1152 }
1153
1154 /* Remove what was authorised to be written in temp us */
1155 m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0));
1156
1157 if (m_nb_bytes < 0) {
1158 m_nb_bytes = 0;
1159 }
1160
1161 /* What exceed should be converted in sleep time */
1162 int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
1163 if (usec_sleep > 100) {
1164 bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
1165 m_last_tick = get_current_btime();
1166 m_nb_bytes = 0;
1167 } else {
1168 m_last_tick = now;
1169 }
1170 }
1171
1172 /*
1173 * Write nbytes to the network.
1174 * It may require several writes.
1175 */
1176
write_nbytes(char * ptr,int32_t nbytes)1177 int32_t BSOCKCORE::write_nbytes(char *ptr, int32_t nbytes)
1178 {
1179 int32_t nleft, nwritten;
1180
1181 #ifdef HAVE_TLS
1182 if (tls) {
1183 /* TLS enabled */
1184 return (tls_bsock_writen((BSOCK*)this, ptr, nbytes));
1185 }
1186 #endif /* HAVE_TLS */
1187
1188 nleft = nbytes;
1189 while (nleft > 0) {
1190 do {
1191 errno = 0;
1192 nwritten = socketWrite(m_fd, ptr, nleft);
1193 if (is_timed_out() || is_terminated()) {
1194 return -1;
1195 }
1196
1197 #ifdef HAVE_WIN32
1198 /*
1199 * We simulate errno on Windows for a socket
1200 * error in order to handle errors correctly.
1201 */
1202 if (nwritten == SOCKET_ERROR) {
1203 DWORD err = WSAGetLastError();
1204 nwritten = -1;
1205 if (err == WSAEINTR) {
1206 errno = EINTR;
1207 } else if (err == WSAEWOULDBLOCK) {
1208 errno = EAGAIN;
1209 } else {
1210 errno = EIO; /* some other error */
1211 }
1212 }
1213 #endif
1214
1215 } while (nwritten == -1 && errno == EINTR);
1216 /*
1217 * If connection is non-blocking, we will get EAGAIN, so
1218 * use select()/poll to keep from consuming all the CPU
1219 * and try again.
1220 */
1221 if (nwritten == -1 && errno == EAGAIN) {
1222 fd_wait_data(m_fd, WAIT_WRITE, 1, 0);
1223 continue;
1224 }
1225 if (nwritten <= 0) {
1226 return -1; /* error */
1227 }
1228 nleft -= nwritten;
1229 ptr += nwritten;
1230 if (use_bwlimit()) {
1231 control_bwlimit(nwritten);
1232 }
1233 }
1234 return nbytes - nleft;
1235 }
1236
1237 /*
1238 * Read a nbytes from the network.
1239 * It is possible that the total bytes require in several
1240 * read requests
1241 */
1242
read_nbytes(char * ptr,int32_t nbytes)1243 int32_t BSOCKCORE::read_nbytes(char *ptr, int32_t nbytes)
1244 {
1245 int32_t nleft, nread;
1246
1247 #ifdef HAVE_TLS
1248 if (tls) {
1249 /* TLS enabled */
1250 return (tls_bsock_readn((BSOCK*)this, ptr, nbytes));
1251 }
1252 #endif /* HAVE_TLS */
1253
1254 nleft = nbytes;
1255 while (nleft > 0) {
1256 errno = 0;
1257 nread = socketRead(m_fd, ptr, nleft);
1258 if (is_timed_out() || is_terminated()) {
1259 return -1;
1260 }
1261
1262 #ifdef HAVE_WIN32
1263 /*
1264 * We simulate errno on Windows for a socket
1265 * error in order to handle errors correctly.
1266 */
1267 if (nread == SOCKET_ERROR) {
1268 DWORD err = WSAGetLastError();
1269 nread = -1;
1270 if (err == WSAEINTR) {
1271 errno = EINTR;
1272 } else if (err == WSAEWOULDBLOCK) {
1273 errno = EAGAIN;
1274 } else {
1275 errno = EIO; /* some other error */
1276 }
1277 }
1278 #endif
1279
1280 if (nread == -1) {
1281 if (errno == EINTR) {
1282 continue;
1283 }
1284 if (errno == EAGAIN) {
1285 bmicrosleep(0, 20000); /* try again in 20ms */
1286 continue;
1287 }
1288 }
1289 if (nread <= 0) {
1290 return -1; /* error, or EOF */
1291 }
1292 nleft -= nread;
1293 ptr += nread;
1294 if (use_bwlimit()) {
1295 control_bwlimit(nread);
1296 }
1297 }
1298 return nbytes - nleft; /* return >= 0 */
1299 }
1300
1301 #ifdef HAVE_WIN32
1302 /*
1303 * closesocket is supposed to do a graceful disconnect under Window
1304 * but it doesn't. Comments on http://msdn.microsoft.com/en-us/li
1305 * confirm this behaviour. DisconnectEx is required instead, but
1306 * that function needs to be retrieved via WS IOCTL
1307 */
1308 static void
win_close_wait(int fd)1309 win_close_wait(int fd)
1310 {
1311 int ret;
1312 GUID disconnectex_guid = WSAID_DISCONNECTEX;
1313 DWORD bytes_returned;
1314 LPFN_DISCONNECTEX DisconnectEx;
1315 ret = WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &disconnectex_guid, sizeof(disconnectex_guid), &DisconnectEx, sizeof(DisconnectEx), &bytes_returned, NULL, NULL);
1316 Dmsg1(100, "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER, WSAID_DISCONNECTEX) ret = %d\n", ret);
1317 if (!ret) {
1318 DisconnectEx(fd, NULL, 0, 0);
1319 }
1320 }
1321 #endif
1322
1323 #ifndef TEST_PROGRAM
1324 #define TEST_PROGRAM_A
1325 #endif
1326
dump()1327 void BSOCKCORE::dump()
1328 {
1329 #ifdef TEST_PROGRAM
1330 char ed1[50];
1331 Pmsg1(-1, "BSOCKCORE::dump(): %p\n", this);
1332 Pmsg1(-1, "\tmsg: %p\n", msg);
1333 Pmsg1(-1, "\terrmsg: %p\n", errmsg);
1334 Pmsg1(-1, "\tres: %p\n", res);
1335 Pmsg1(-1, "\ttls: %p\n", tls);
1336 Pmsg1(-1, "\tsrc_addr: %p\n", src_addr);
1337 Pmsg1(-1, "\tread_seqno: %s\n", edit_uint64(read_seqno, ed1));
1338 Pmsg1(-1, "\tin_msg_no: %s\n", edit_uint64(in_msg_no, ed1));
1339 Pmsg1(-1, "\tout_msg_no: %s\n", edit_uint64(out_msg_no, ed1));
1340 Pmsg1(-1, "\tpout_msg_no: %p\n", pout_msg_no);
1341 Pmsg1(-1, "\tmsglen: %s\n", edit_int64(msglen, ed1));
1342 Pmsg1(-1, "\ttimer_start: %ld\n", timer_start);
1343 Pmsg1(-1, "\ttimeout: %ld\n", timeout);
1344 Pmsg1(-1, "\tm_fd: %d\n", m_fd);
1345 Pmsg1(-1, "\tb_errno: %d\n", b_errno);
1346 Pmsg1(-1, "\tm_blocking: %d\n", m_blocking);
1347 Pmsg1(-1, "\terrors: %d\n", errors);
1348 Pmsg1(-1, "\tm_suppress_error_msgs: %s\n", m_suppress_error_msgs?"true":"false");
1349 // Pmsg1(0, "\tclient_addr:{ } struct sockaddr client_addr; /* client's IP address */
1350 // Pmsg1(0, "\tstruct sockaddr_in peer_addr; /* peer's IP address */
1351 Pmsg1(-1, "\tsend_hook_cb: %p\n", send_hook_cb);
1352 Pmsg1(-1, "\tm_master: %p\n", m_master);
1353 Pmsg1(-1, "\tm_next: %p\n", m_next);
1354 Pmsg1(-1, "\tm_jcr: %p\n", m_jcr);
1355 // pthread_mutex_t m_rmutex; /* for read locking if use_locking set */
1356 // pthread_mutex_t m_wmutex; /* for write locking if use_locking set */
1357 // mutable pthread_mutex_t m_mmutex; /* when accessing the master/next chain */
1358 // pthread_mutex_t *pm_rmutex; /* Pointer to the read mutex */
1359 // pthread_mutex_t *pm_wmutex; /* Pointer to the write mutex */
1360 Pmsg1(-1, "\tm_who: %p\n", m_who);
1361 Pmsg1(-1, "\tm_host: %p\n", m_host);
1362 Pmsg1(-1, "\tm_port: %d\n", m_port);
1363 Pmsg1(-1, "\tm_tid: %p\n", m_tid);
1364 Pmsg1(-1, "\tm_flags: %s\n", edit_uint64(m_flags, ed1));
1365 Pmsg1(-1, "\tm_timed_out: %s\n", m_timed_out?"true":"false");
1366 Pmsg1(-1, "\tm_terminated: %s\n", m_terminated?"true":"false");
1367 Pmsg1(-1, "\tm_closed: %s\n", m_closed?"true":"false");
1368 Pmsg1(-1, "\tm_duped: %s\n", m_duped?"true":"false");
1369 Pmsg1(-1, "\tm_use_locking: %s\n", m_use_locking?"true":"false");
1370 Pmsg1(-1, "\tm_bwlimit: %s\n", edit_int64(m_bwlimit, ed1));
1371 Pmsg1(-1, "\tm_nb_bytes: %s\n", edit_int64(m_nb_bytes, ed1));
1372 Pmsg1(-1, "\tm_last_tick: %s\n", edit_int64(m_last_tick, ed1));
1373 #endif
1374 };
1375
1376
1377 #ifdef TEST_PROGRAM
1378 #include "unittests.h"
1379
free_my_jcr(JCR * jcr)1380 void free_my_jcr(JCR *jcr){
1381 /* TODO: handle full JCR free */
1382 free_jcr(jcr);
1383 };
1384
1385 #define ofnamefmt "/tmp/bsockcore.%d.test"
1386 const char *data = "This is a BSOCKCORE communication test: 1234567\n";
1387 const char *hexdata = "< 00000000 54 68 69 73 20 69 73 20 61 20 42 53 4f 43 4b 43 # This is a BSOCKC\n" \
1388 "< 00000010 4f 52 45 20 63 6f 6d 6d 75 6e 69 63 61 74 69 6f # ORE communicatio\n" \
1389 "< 00000020 6e 20 74 65 73 74 3a 20 31 32 33 34 35 36 37 0a # n test: 1234567.\n";
1390
main()1391 int main()
1392 {
1393 Unittests bsockcore_test("bsockcore_test", true);
1394 BSOCKCORE *bs;
1395 pid_t pid;
1396 int rc;
1397 char *host = (char*)"localhost";
1398 char *name = (char*)"Test";
1399 JCR *jcr;
1400 bool btest;
1401 char buf[256]; // extend this buffer when hexdata becomes longer
1402 int fd;
1403
1404 Pmsg0(0, "Initialize tests ...\n");
1405
1406 jcr = new_jcr(sizeof(JCR), NULL);
1407 bs = New(BSOCKCORE);
1408 bs->set_jcr(jcr);
1409 ok(bs != NULL && bs->jcr() == jcr,
1410 "Default initialization");
1411
1412 Pmsg0(0, "Preparing fork\n");
1413 pid = fork();
1414 if (0 == pid){
1415 Pmsg0(0, "Prepare to execute netcat\n");
1416 pid_t mypid = getpid();
1417 char ofname[30];
1418 snprintf(ofname, sizeof(ofname), ofnamefmt, mypid);
1419 rc = execl("/bin/netcat", "netcat", "-v", "-p", "20000", "-l", "-o", ofname, NULL);
1420 Pmsg1(0, "Error executing netcat: %s\n", strerror(rc));
1421 exit(1);
1422 }
1423 Pmsg1(0, "After fork: %d\n", pid);
1424 bmicrosleep(2, 0); // we wait a bit to netcat to start
1425 btest = bs->connect(jcr, 1, 10, 0, name, host, NULL, 20000, 0);
1426 ok(btest, "BSOCKCORE connection test");
1427 if (btest){
1428 /* we are connected, so send some data */
1429 bs->fsend("%s", data);
1430 bmicrosleep(2, 0); // wait until data received by netcat
1431 bs->close();
1432 ok(bs->is_closed(), "Close bsockcore");
1433 /* now check what netcat received */
1434 char ofname[30];
1435 snprintf(ofname, sizeof(ofname), ofnamefmt, pid);
1436 fd = open(ofname, O_RDONLY);
1437 btest = false;
1438 if (fd > 0){
1439 btest = true;
1440 read(fd, buf, strlen(hexdata));
1441 close(fd);
1442 unlink(ofname);
1443 }
1444 ok(btest, "Output file available");
1445 ok(strcmp(buf, hexdata) == 0, "Communication data");
1446 }
1447 kill(pid, SIGTERM);
1448 delete(bs);
1449 free_my_jcr(jcr);
1450 term_last_jobs_list();
1451 return report();
1452 };
1453 #endif /* TEST_PROGRAM */
1454