1 /*
2    BAREOS® - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2013-2018 Bareos GmbH & Co. KG
5 
6    This program is Free Software; you can redistribute it and/or
7    modify it under the terms of version three of the GNU Affero General Public
8    License as published by the Free Software Foundation and included
9    in the file LICENSE.
10 
11    This program is distributed in the hope that it will be useful, but
12    WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14    Affero General Public License for more details.
15 
16    You should have received a copy of the GNU Affero General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19    02110-1301, USA.
20 */
21 /*
22  * Marco van Wieringen, August 2013.
23  */
24 
25 /*
26  * Circular buffer used for producer/consumer problem with pthreads.
27  */
28 #include "include/bareos.h"
29 #include "cbuf.h"
30 
31 /*
32  * Initialize a new circular buffer.
33  */
init(int capacity)34 int CircularBuffer::init(int capacity)
35 {
36   if (pthread_mutex_init(&lock_, NULL) != 0) { return -1; }
37 
38   if (pthread_cond_init(&notfull_, NULL) != 0) {
39     pthread_mutex_destroy(&lock_);
40     return -1;
41   }
42 
43   if (pthread_cond_init(&notempty_, NULL) != 0) {
44     pthread_cond_destroy(&notfull_);
45     pthread_mutex_destroy(&lock_);
46     return -1;
47   }
48 
49   next_in_ = 0;
50   next_out_ = 0;
51   size_ = 0;
52   capacity_ = capacity;
53   if (data_) { free(data_); }
54   data_ = (void**)malloc(capacity_ * sizeof(void*));
55 
56   return 0;
57 }
58 
59 /*
60  * Destroy a circular buffer.
61  */
destroy()62 void CircularBuffer::destroy()
63 {
64   pthread_cond_destroy(&notempty_);
65   pthread_cond_destroy(&notfull_);
66   pthread_mutex_destroy(&lock_);
67   if (data_) {
68     free(data_);
69     data_ = NULL;
70   }
71 }
72 
73 /*
74  * Enqueue a new item into the circular buffer.
75  */
enqueue(void * data)76 int CircularBuffer::enqueue(void* data)
77 {
78   if (pthread_mutex_lock(&lock_) != 0) { return -1; }
79 
80   /*
81    * Wait while the buffer is full.
82    */
83   while (full()) { pthread_cond_wait(&notfull_, &lock_); }
84   data_[next_in_++] = data;
85   size_++;
86   next_in_ %= capacity_;
87 
88   /*
89    * Let any waiting consumer know there is data.
90    */
91   pthread_cond_broadcast(&notempty_);
92 
93   pthread_mutex_unlock(&lock_);
94 
95   return 0;
96 }
97 
98 /*
99  * Dequeue an item from the circular buffer.
100  */
dequeue()101 void* CircularBuffer::dequeue()
102 {
103   void* data = NULL;
104 
105   if (pthread_mutex_lock(&lock_) != 0) { return NULL; }
106 
107   /*
108    * Wait while there is nothing in the buffer
109    */
110   while (empty() && !flush_) { pthread_cond_wait(&notempty_, &lock_); }
111 
112   /*
113    * When we are requested to flush and there is no data left return NULL.
114    */
115   if (empty() && flush_) { goto bail_out; }
116 
117   data = data_[next_out_++];
118   size_--;
119   next_out_ %= capacity_;
120 
121   /*
122    * Let all waiting producers know there is room.
123    */
124   pthread_cond_broadcast(&notfull_);
125 
126 bail_out:
127   pthread_mutex_unlock(&lock_);
128 
129   return data;
130 }
131 
132 /*
133  * Make sure there is a free next slot available on the circular buffer.
134  * So the next enqueue will not block but we block now until one is available.
135  */
NextSlot()136 int CircularBuffer::NextSlot()
137 {
138   if (pthread_mutex_lock(&lock_) != 0) { return -1; }
139 
140   /*
141    * Wait while the buffer is full.
142    */
143   while (full()) { pthread_cond_wait(&notfull_, &lock_); }
144 
145   pthread_mutex_unlock(&lock_);
146 
147   return next_in_;
148 }
149 
150 /*
151  * Flush the circular buffer. Any waiting consumer will be wakened and will
152  * see we are in flush state.
153  */
flush()154 int CircularBuffer::flush()
155 {
156   if (pthread_mutex_lock(&lock_) != 0) { return -1; }
157 
158   /*
159    * Set the flush flag.
160    */
161   flush_ = true;
162 
163   /*
164    * Let all waiting consumers know there will be no more data.
165    */
166   pthread_cond_broadcast(&notempty_);
167 
168   pthread_mutex_unlock(&lock_);
169 
170   return 0;
171 }
172