1 /*
2 BAREOS® - Backup Archiving REcovery Open Sourced
3
4 Copyright (C) 2013-2018 Bareos GmbH & Co. KG
5
6 This program is Free Software; you can redistribute it and/or
7 modify it under the terms of version three of the GNU Affero General Public
8 License as published by the Free Software Foundation and included
9 in the file LICENSE.
10
11 This program is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Affero General Public License for more details.
15
16 You should have received a copy of the GNU Affero General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 02110-1301, USA.
20 */
21 /*
22 * Marco van Wieringen, August 2013.
23 */
24
25 /*
26 * Circular buffer used for producer/consumer problem with pthreads.
27 */
28 #include "include/bareos.h"
29 #include "cbuf.h"
30
31 /*
32 * Initialize a new circular buffer.
33 */
init(int capacity)34 int CircularBuffer::init(int capacity)
35 {
36 if (pthread_mutex_init(&lock_, NULL) != 0) { return -1; }
37
38 if (pthread_cond_init(¬full_, NULL) != 0) {
39 pthread_mutex_destroy(&lock_);
40 return -1;
41 }
42
43 if (pthread_cond_init(¬empty_, NULL) != 0) {
44 pthread_cond_destroy(¬full_);
45 pthread_mutex_destroy(&lock_);
46 return -1;
47 }
48
49 next_in_ = 0;
50 next_out_ = 0;
51 size_ = 0;
52 capacity_ = capacity;
53 if (data_) { free(data_); }
54 data_ = (void**)malloc(capacity_ * sizeof(void*));
55
56 return 0;
57 }
58
59 /*
60 * Destroy a circular buffer.
61 */
destroy()62 void CircularBuffer::destroy()
63 {
64 pthread_cond_destroy(¬empty_);
65 pthread_cond_destroy(¬full_);
66 pthread_mutex_destroy(&lock_);
67 if (data_) {
68 free(data_);
69 data_ = NULL;
70 }
71 }
72
73 /*
74 * Enqueue a new item into the circular buffer.
75 */
enqueue(void * data)76 int CircularBuffer::enqueue(void* data)
77 {
78 if (pthread_mutex_lock(&lock_) != 0) { return -1; }
79
80 /*
81 * Wait while the buffer is full.
82 */
83 while (full()) { pthread_cond_wait(¬full_, &lock_); }
84 data_[next_in_++] = data;
85 size_++;
86 next_in_ %= capacity_;
87
88 /*
89 * Let any waiting consumer know there is data.
90 */
91 pthread_cond_broadcast(¬empty_);
92
93 pthread_mutex_unlock(&lock_);
94
95 return 0;
96 }
97
98 /*
99 * Dequeue an item from the circular buffer.
100 */
dequeue()101 void* CircularBuffer::dequeue()
102 {
103 void* data = NULL;
104
105 if (pthread_mutex_lock(&lock_) != 0) { return NULL; }
106
107 /*
108 * Wait while there is nothing in the buffer
109 */
110 while (empty() && !flush_) { pthread_cond_wait(¬empty_, &lock_); }
111
112 /*
113 * When we are requested to flush and there is no data left return NULL.
114 */
115 if (empty() && flush_) { goto bail_out; }
116
117 data = data_[next_out_++];
118 size_--;
119 next_out_ %= capacity_;
120
121 /*
122 * Let all waiting producers know there is room.
123 */
124 pthread_cond_broadcast(¬full_);
125
126 bail_out:
127 pthread_mutex_unlock(&lock_);
128
129 return data;
130 }
131
132 /*
133 * Make sure there is a free next slot available on the circular buffer.
134 * So the next enqueue will not block but we block now until one is available.
135 */
NextSlot()136 int CircularBuffer::NextSlot()
137 {
138 if (pthread_mutex_lock(&lock_) != 0) { return -1; }
139
140 /*
141 * Wait while the buffer is full.
142 */
143 while (full()) { pthread_cond_wait(¬full_, &lock_); }
144
145 pthread_mutex_unlock(&lock_);
146
147 return next_in_;
148 }
149
150 /*
151 * Flush the circular buffer. Any waiting consumer will be wakened and will
152 * see we are in flush state.
153 */
flush()154 int CircularBuffer::flush()
155 {
156 if (pthread_mutex_lock(&lock_) != 0) { return -1; }
157
158 /*
159 * Set the flush flag.
160 */
161 flush_ = true;
162
163 /*
164 * Let all waiting consumers know there will be no more data.
165 */
166 pthread_cond_broadcast(¬empty_);
167
168 pthread_mutex_unlock(&lock_);
169
170 return 0;
171 }
172