1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19
20 #ifndef __BGET_MSG_H_
21 #define __BGET_MSG_H_
22
23 #include "bacula.h"
24
25 struct DEV_RECORD;
26
27 typedef uint64_t blockaddr;
28 typedef int64_t blockidx;
29
30 #ifdef COMMUNITY
31 #define BLOCK_HEAD_SIZE 4 // only one int
32 #define GETMSG_MAX_BLOCK_SIZE (65*1024-BLOCK_HEAD_SIZE)
33 #define GETMSG_MAX_HASH_SIZE 64 /* SHA512 */
34 #define GETMSG_MAX_MSG_SIZE (GETMSG_MAX_BLOCK_SIZE+GETMSG_MAX_HASH_SIZE+sizeof(uint32_t)+OFFSET_FADDR_SIZE+100)
35 #endif
36
37
38 /*
39 * class bmessage and class GetMsg are used by client side rehydartion
40 * to control latency
41 *
42 */
43
44 class bmessage: public SMARTALLOC
45 {
46 public:
47 enum { bm_none, bm_ready, bm_busy, bm_ref };
48
49 POOLMEM *msg; // exchanged with BSOCK
50 int32_t msglen; // length from BSOCK
51 int32_t origlen; // length before rehydration, to be compared with length in header
52 char *rbuf; // adjusted to point to data inside *msg
53 int32_t rbuflen; // adjusted from msglen
54 int status;
55 int ret; // return value from bget_msg()
56 int jobbytes; // must be added to jcr->JobBytes if block is downloaded
57
58 unsigned char *hash; // point to an offset of msg, the hash
59 char *eraw; // point to an offset of msg, lz4encoded raw data
60 blockaddr *paddr; // point to an offset of msg
61 blockaddr addr; // the block address
62 int32_t dedup_size; // block size from reference
63 bool is_sparse; // is a SPARSE stream
64 bool is_header; // is a header record
65 bool is_dedup; // is a dedup reference
66 bool do_flowcontrol;
67 bmessage(int bufsize);
68 virtual ~bmessage();
69 void swap(BSOCK *sock);
70 };
71
72 class GetMsg: public SMARTALLOC
73 {
74 public:
75 JCR *jcr;
76 BSOCK *bsock;
77 const char *rec_header; /* Format of a header */
78 int32_t bufsize; /* "ideal" bufsize from JCR */
79
80 bool m_is_stop; /* set by the read thread when bsock->is_stop() */
81 bool m_is_done; /* set when the read thread finish (no more record will be pushed) */
82 bool m_is_error; /* set when the read thread get an error */
83
84 int32_t m_use_count;
85
86 pthread_mutex_t mutex;
87 pthread_cond_t cond;
88 bmessage *bmsg_aux;
89 bmessage *bmsg; // local bmsg used by bget_msg(NULL)
90 int32_t msglen; // used to mimic BSOCK, updated by bget_msg()
91 POOLMEM *msg; // used to mimic BSOCK, updated by bget_msg()
92
inc_use_count(void)93 void inc_use_count(void) {P(mutex); m_use_count++; V(mutex); };
dec_use_count(void)94 void dec_use_count(void) {P(mutex); m_use_count--; V(mutex); };
use_count()95 int32_t use_count() { int32_t v; P(mutex); v = m_use_count; V(mutex); return v;};
96
97
98 GetMsg(JCR *a_jcr, BSOCK *a_bsock, const char *a_rec_header, int32_t a_bufsize);
99 virtual ~GetMsg();
100
101 virtual int bget_msg(bmessage **pbmsg=NULL);
do_read_sock_thread(void)102 inline virtual void *do_read_sock_thread(void) { return NULL; };
start_read_sock()103 inline virtual int start_read_sock() { return 0; };
wait_read_sock(int)104 inline virtual void *wait_read_sock(int /*emergency_quit*/) { return NULL;};
105
is_stop()106 virtual bool is_stop() { return (m_is_stop!=false); };
is_done()107 virtual bool is_done() { return (m_is_done!=false); };
is_error()108 virtual bool is_error(){ return (m_is_error!=false); };
109
new_msg()110 bmessage *new_msg() { return New(bmessage(bufsize)); };
111
112 /* used by inherited classes to commit any pending operations at the end */
commit(POOLMEM * &,uint32_t)113 virtual int commit(POOLMEM *&/*errmsg*/, uint32_t /*jobid*/) { return 0; };
114
115 /* used by classes to commit any pending operations at the end */
dedup_store_chunk(DEV_RECORD *,const char *,int,char *,char *,POOLMEM * &)116 virtual bool dedup_store_chunk(DEV_RECORD */*rec*/, const char */*rbuf*/, int /*rbuflen*/,
117 char */*dedup_ref_buf*/, char */*wdedup_ref_buf*/, POOLMEM *&/*errmsg*/)
118 { return true; /* OK */ };
119
120 };
121
122 /* Call this function to release the memory associated with the message queue
123 * The reading thread is using the BufferedMsgBase to work, so we need to free
124 * the memory only when the main thread and the reading thread agree
125 */
free_GetMsg(GetMsg * b)126 inline void free_GetMsg(GetMsg *b)
127 {
128 b->dec_use_count();
129 ASSERT2(b->use_count() >= 0, "GetMsg use_count too low");
130 if (b->use_count() == 0) {
131 delete b;
132 }
133 }
134
135 #endif
136