1 /*
2    Bacula(R) - The Network Backup Solution
3 
4    Copyright (C) 2000-2020 Kern Sibbald
5 
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8 
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13 
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16 
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 
20 #ifndef __BGET_MSG_H_
21 #define __BGET_MSG_H_
22 
23 #include "bacula.h"
24 
25 struct DEV_RECORD;
26 
27 typedef uint64_t blockaddr;
28 typedef int64_t  blockidx;
29 
30 #ifdef COMMUNITY
31 #define BLOCK_HEAD_SIZE               4       // only one int
32 #define GETMSG_MAX_BLOCK_SIZE    (65*1024-BLOCK_HEAD_SIZE)
33 #define GETMSG_MAX_HASH_SIZE     64  /* SHA512 */
34 #define GETMSG_MAX_MSG_SIZE      (GETMSG_MAX_BLOCK_SIZE+GETMSG_MAX_HASH_SIZE+sizeof(uint32_t)+OFFSET_FADDR_SIZE+100)
35 #endif
36 
37 
38 /*
39  * class bmessage and class GetMsg are used by client side rehydartion
40  * to control latency
41  *
42  */
43 
44 class bmessage: public SMARTALLOC
45 {
46 public:
47    enum { bm_none, bm_ready, bm_busy, bm_ref };
48 
49    POOLMEM *msg;    // exchanged with BSOCK
50    int32_t msglen;  // length from BSOCK
51    int32_t origlen; // length before rehydration, to be compared with length in header
52    char *rbuf;      // adjusted to point to data inside *msg
53    int32_t rbuflen; // adjusted from msglen
54    int status;
55    int ret;         // return value from bget_msg()
56    int jobbytes;    // must be added to jcr->JobBytes if block is downloaded
57 
58    unsigned char *hash; // point to an offset of msg, the hash
59    char *eraw;          // point to an offset of msg, lz4encoded raw data
60    blockaddr *paddr;      // point to an offset of msg
61    blockaddr addr;        // the block address
62    int32_t dedup_size;   // block size from reference
63    bool is_sparse;  // is a SPARSE stream
64    bool is_header;  // is a header record
65    bool is_dedup;   // is a dedup reference
66    bool do_flowcontrol;
67    bmessage(int bufsize);
68    virtual ~bmessage();
69    void swap(BSOCK *sock);
70 };
71 
72 class GetMsg: public SMARTALLOC
73 {
74 public:
75    JCR *jcr;
76    BSOCK *bsock;
77    const char *rec_header;        /* Format of a header */
78    int32_t bufsize;               /* "ideal" bufsize from JCR */
79 
80    bool m_is_stop;                /* set by the read thread when bsock->is_stop() */
81    bool m_is_done;                /* set when the read thread finish (no more record will be pushed) */
82    bool m_is_error;               /* set when the read thread get an error */
83 
84    int32_t m_use_count;
85 
86    pthread_mutex_t mutex;
87    pthread_cond_t cond;
88    bmessage *bmsg_aux;
89    bmessage *bmsg;   // local bmsg used by bget_msg(NULL)
90    int32_t msglen;   // used to mimic BSOCK, updated by bget_msg()
91    POOLMEM *msg;     // used to mimic BSOCK, updated by bget_msg()
92 
inc_use_count(void)93    void inc_use_count(void) {P(mutex); m_use_count++; V(mutex); };
dec_use_count(void)94    void dec_use_count(void) {P(mutex); m_use_count--; V(mutex); };
use_count()95    int32_t use_count() { int32_t v; P(mutex); v = m_use_count; V(mutex); return v;};
96 
97 
98    GetMsg(JCR *a_jcr, BSOCK *a_bsock, const char *a_rec_header, int32_t a_bufsize);
99    virtual ~GetMsg();
100 
101    virtual int bget_msg(bmessage **pbmsg=NULL);
do_read_sock_thread(void)102    inline virtual void *do_read_sock_thread(void) { return NULL; };
start_read_sock()103    inline virtual int start_read_sock() { return 0; };
wait_read_sock(int)104    inline virtual void *wait_read_sock(int /*emergency_quit*/) { return NULL;};
105 
is_stop()106    virtual bool is_stop() { return (m_is_stop!=false); };
is_done()107    virtual bool is_done() { return (m_is_done!=false); };
is_error()108    virtual bool is_error(){ return (m_is_error!=false); };
109 
new_msg()110    bmessage *new_msg() { return New(bmessage(bufsize)); };
111 
112    /* used by inherited classes to commit any pending operations at the end */
commit(POOLMEM * &,uint32_t)113    virtual int commit(POOLMEM *&/*errmsg*/, uint32_t /*jobid*/) { return 0; };
114 
115    /* used by classes to commit any pending operations at the end */
dedup_store_chunk(DEV_RECORD *,const char *,int,char *,char *,POOLMEM * &)116    virtual bool dedup_store_chunk(DEV_RECORD */*rec*/, const char */*rbuf*/, int /*rbuflen*/,
117                                   char */*dedup_ref_buf*/, char */*wdedup_ref_buf*/, POOLMEM *&/*errmsg*/)
118       { return true; /* OK */ };
119 
120 };
121 
122 /* Call this function to release the memory associated with the message queue
123  * The reading thread is using the BufferedMsgBase to work, so we need to free
124  * the memory only when the main thread and the reading thread agree
125  */
free_GetMsg(GetMsg * b)126 inline void free_GetMsg(GetMsg *b)
127 {
128    b->dec_use_count();
129    ASSERT2(b->use_count() >= 0, "GetMsg use_count too low");
130    if (b->use_count() == 0) {
131       delete b;
132    }
133 }
134 
135 #endif
136