1 /*
2 BAREOS® - Backup Archiving REcovery Open Sourced
3
4 Copyright (C) 2016-2020 Bareos GmbH & Co. KG
5
6 This program is Free Software; you can redistribute it and/or
7 modify it under the terms of version three of the GNU Affero General Public
8 License as published by the Free Software Foundation and included
9 in the file LICENSE.
10
11 This program is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Affero General Public License for more details.
15
16 You should have received a copy of the GNU Affero General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 02110-1301, USA.
20 */
21
22 /*
23 * Connection and Connection Pool
24 */
25
26 #include "include/bareos.h"
27 #include "connection_pool.h"
28 #include "lib/util.h"
29 #include "lib/alist.h"
30 #include "lib/bsys.h"
31 #include "lib/bsock.h"
32
33 /*
34 * Connection
35 */
Connection(const char * name,int protocol_version,BareosSocket * socket,bool authenticated)36 Connection::Connection(const char* name,
37 int protocol_version,
38 BareosSocket* socket,
39 bool authenticated)
40 {
41 tid_ = pthread_self();
42 connect_time_ = time(nullptr);
43 in_use_ = false;
44 authenticated_ = authenticated;
45 bstrncpy(name_, name, sizeof(name_));
46 protocol_version_ = protocol_version;
47 socket_ = socket;
48 pthread_mutex_init(&mutex_, nullptr);
49 }
50
~Connection()51 Connection::~Connection() { pthread_mutex_destroy(&mutex_); }
52
53 /*
54 * Check if connection is still active.
55 */
check(int timeout_data)56 bool Connection::check(int timeout_data)
57 {
58 int data_available = 0;
59 bool ok = true;
60
61 /*
62 * Returns: 1 if data available, 0 if timeout, -1 if error
63 */
64 data_available = socket_->WaitDataIntr(timeout_data);
65
66 /*
67 * Use lock to prevent that data is read for job thread.
68 */
69 lock();
70 if (data_available < 0) {
71 ok = false;
72 } else if ((data_available > 0) && (!in_use_)) {
73 if (socket_->recv() <= 0) { ok = false; }
74
75 if (socket_->IsError()) { ok = false; }
76 }
77 unlock();
78
79 if (!ok) { socket_->close(); }
80
81 return ok;
82 }
83
84 /*
85 * Request to take over the connection (socket) from another thread.
86 */
take()87 bool Connection::take()
88 {
89 bool result = false;
90 lock();
91 if (!in_use_) {
92 in_use_ = true;
93 result = true;
94 }
95 unlock();
96
97 return result;
98 }
99
100 /*
101 * Connection Pool
102 */
ConnectionPool()103 ConnectionPool::ConnectionPool()
104 {
105 connections_ = new alist(10, false);
106 /*
107 * Initialize mutex and condition variable objects.
108 */
109 pthread_mutex_init(&add_mutex_, nullptr);
110 pthread_cond_init(&add_cond_var_, nullptr);
111 }
112
~ConnectionPool()113 ConnectionPool::~ConnectionPool()
114 {
115 delete (connections_);
116 pthread_mutex_destroy(&add_mutex_);
117 pthread_cond_destroy(&add_cond_var_);
118 }
119
cleanup()120 void ConnectionPool::cleanup()
121 {
122 Connection* connection = nullptr;
123 int i = 0;
124 for (i = connections_->size() - 1; i >= 0; i--) {
125 connection = (Connection*)connections_->get(i);
126 Dmsg2(800, "checking connection %s (%d)\n", connection->name(), i);
127 if (!connection->check()) {
128 Dmsg2(120, "connection %s (%d) is terminated => removed\n",
129 connection->name(), i);
130 connections_->remove(i);
131 delete (connection);
132 }
133 }
134 }
135
get_as_alist()136 alist* ConnectionPool::get_as_alist()
137 {
138 cleanup();
139 return connections_;
140 }
141
add(Connection * connection)142 bool ConnectionPool::add(Connection* connection)
143 {
144 cleanup();
145 Dmsg1(120, "add connection: %s\n", connection->name());
146 P(add_mutex_);
147 connections_->append(connection);
148 pthread_cond_broadcast(&add_cond_var_);
149 V(add_mutex_);
150 return true;
151 }
152
add_connection(const char * name,int fd_protocol_version,BareosSocket * socket,bool authenticated)153 Connection* ConnectionPool::add_connection(const char* name,
154 int fd_protocol_version,
155 BareosSocket* socket,
156 bool authenticated)
157 {
158 Connection* connection
159 = new Connection(name, fd_protocol_version, socket, authenticated);
160 if (!add(connection)) {
161 delete (connection);
162 return nullptr;
163 }
164 return connection;
165 }
166
get_connection(const char * name)167 Connection* ConnectionPool::get_connection(const char* name)
168 {
169 Connection* connection = nullptr;
170 if (!name) { return nullptr; }
171 foreach_alist (connection, connections_) {
172 if (connection->check() && connection->authenticated()
173 && connection->bsock() && (!connection->in_use())
174 && bstrcmp(name, connection->name())) {
175 Dmsg1(120, "found connection from client %s\n", connection->name());
176 return connection;
177 }
178 }
179 return nullptr;
180 }
181
get_connection(const char * name,timespec & timeout)182 Connection* ConnectionPool::get_connection(const char* name, timespec& timeout)
183 {
184 Connection* connection = nullptr;
185 int errstat = 0;
186
187 if (!name) { return nullptr; }
188
189 while ((!connection) && (errstat == 0)) {
190 connection = get_connection(name);
191 if (!connection) {
192 Dmsg0(120, "waiting for new connections.\n");
193 errstat = WaitForNewConnection(timeout);
194 if (errstat == ETIMEDOUT) {
195 Dmsg0(120, "timeout while waiting for new connections.\n");
196 }
197 }
198 }
199
200 return connection;
201 }
202
WaitForNewConnection(timespec & timeout)203 int ConnectionPool::WaitForNewConnection(timespec& timeout)
204 {
205 int errstat;
206
207 P(add_mutex_);
208 errstat = pthread_cond_timedwait(&add_cond_var_, &add_mutex_, &timeout);
209 V(add_mutex_);
210 if (errstat == 0) {
211 Dmsg0(120, "new connection available.\n");
212 } else if (errstat == ETIMEDOUT) {
213 Dmsg0(120, "timeout.\n");
214 } else {
215 Emsg1(M_ERROR, 0, "error: %d\n", errstat);
216 }
217 return errstat;
218 }
219
remove(Connection * connection)220 bool ConnectionPool::remove(Connection* connection)
221 {
222 bool removed = false;
223 for (int i = connections_->size() - 1; i >= 0; i--) {
224 if (connections_->get(i) == connection) {
225 connections_->remove(i);
226 removed = true;
227 Dmsg0(120, "removed connection.\n");
228 break;
229 }
230 }
231 return removed;
232 }
233
remove(const char * name,int timeout_in_seconds)234 Connection* ConnectionPool::remove(const char* name, int timeout_in_seconds)
235 {
236 bool done = false;
237 Connection* result = nullptr;
238 Connection* connection = nullptr;
239 struct timespec timeout;
240
241 ConvertTimeoutToTimespec(timeout, timeout_in_seconds);
242
243 Dmsg2(120, "waiting for connection from client %s. Timeout: %ds.\n", name,
244 timeout_in_seconds);
245
246 while (!done) {
247 connection = get_connection(name, timeout);
248 if (!connection) {
249 /*
250 * nullptr is returned only on timeout (or other internal errors).
251 */
252 return nullptr;
253 }
254 if (connection->take()) {
255 result = connection;
256 remove(connection);
257 done = true;
258 } else {
259 /*
260 * As we can not take it, we assume it is already taken by another thread.
261 * In any case, we remove it, to prevent to get stuck in this while loop.
262 */
263 remove(connection);
264 }
265 }
266 return result;
267 }
268