1 /*
2    BAREOS® - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2016-2020 Bareos GmbH & Co. KG
5 
6    This program is Free Software; you can redistribute it and/or
7    modify it under the terms of version three of the GNU Affero General Public
8    License as published by the Free Software Foundation and included
9    in the file LICENSE.
10 
11    This program is distributed in the hope that it will be useful, but
12    WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14    Affero General Public License for more details.
15 
16    You should have received a copy of the GNU Affero General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19    02110-1301, USA.
20 */
21 
22 /*
23  * Connection and Connection Pool
24  */
25 
26 #include "include/bareos.h"
27 #include "connection_pool.h"
28 #include "lib/util.h"
29 #include "lib/alist.h"
30 #include "lib/bsys.h"
31 #include "lib/bsock.h"
32 
33 /*
34  * Connection
35  */
Connection(const char * name,int protocol_version,BareosSocket * socket,bool authenticated)36 Connection::Connection(const char* name,
37                        int protocol_version,
38                        BareosSocket* socket,
39                        bool authenticated)
40 {
41   tid_ = pthread_self();
42   connect_time_ = time(nullptr);
43   in_use_ = false;
44   authenticated_ = authenticated;
45   bstrncpy(name_, name, sizeof(name_));
46   protocol_version_ = protocol_version;
47   socket_ = socket;
48   pthread_mutex_init(&mutex_, nullptr);
49 }
50 
~Connection()51 Connection::~Connection() { pthread_mutex_destroy(&mutex_); }
52 
53 /*
54  * Check if connection is still active.
55  */
check(int timeout_data)56 bool Connection::check(int timeout_data)
57 {
58   int data_available = 0;
59   bool ok = true;
60 
61   /*
62    * Returns: 1 if data available, 0 if timeout, -1 if error
63    */
64   data_available = socket_->WaitDataIntr(timeout_data);
65 
66   /*
67    * Use lock to prevent that data is read for job thread.
68    */
69   lock();
70   if (data_available < 0) {
71     ok = false;
72   } else if ((data_available > 0) && (!in_use_)) {
73     if (socket_->recv() <= 0) { ok = false; }
74 
75     if (socket_->IsError()) { ok = false; }
76   }
77   unlock();
78 
79   if (!ok) { socket_->close(); }
80 
81   return ok;
82 }
83 
84 /*
85  * Request to take over the connection (socket) from another thread.
86  */
take()87 bool Connection::take()
88 {
89   bool result = false;
90   lock();
91   if (!in_use_) {
92     in_use_ = true;
93     result = true;
94   }
95   unlock();
96 
97   return result;
98 }
99 
100 /*
101  * Connection Pool
102  */
ConnectionPool()103 ConnectionPool::ConnectionPool()
104 {
105   connections_ = new alist(10, false);
106   /*
107    * Initialize mutex and condition variable objects.
108    */
109   pthread_mutex_init(&add_mutex_, nullptr);
110   pthread_cond_init(&add_cond_var_, nullptr);
111 }
112 
~ConnectionPool()113 ConnectionPool::~ConnectionPool()
114 {
115   delete (connections_);
116   pthread_mutex_destroy(&add_mutex_);
117   pthread_cond_destroy(&add_cond_var_);
118 }
119 
cleanup()120 void ConnectionPool::cleanup()
121 {
122   Connection* connection = nullptr;
123   int i = 0;
124   for (i = connections_->size() - 1; i >= 0; i--) {
125     connection = (Connection*)connections_->get(i);
126     Dmsg2(800, "checking connection %s (%d)\n", connection->name(), i);
127     if (!connection->check()) {
128       Dmsg2(120, "connection %s (%d) is terminated => removed\n",
129             connection->name(), i);
130       connections_->remove(i);
131       delete (connection);
132     }
133   }
134 }
135 
get_as_alist()136 alist* ConnectionPool::get_as_alist()
137 {
138   cleanup();
139   return connections_;
140 }
141 
add(Connection * connection)142 bool ConnectionPool::add(Connection* connection)
143 {
144   cleanup();
145   Dmsg1(120, "add connection: %s\n", connection->name());
146   P(add_mutex_);
147   connections_->append(connection);
148   pthread_cond_broadcast(&add_cond_var_);
149   V(add_mutex_);
150   return true;
151 }
152 
add_connection(const char * name,int fd_protocol_version,BareosSocket * socket,bool authenticated)153 Connection* ConnectionPool::add_connection(const char* name,
154                                            int fd_protocol_version,
155                                            BareosSocket* socket,
156                                            bool authenticated)
157 {
158   Connection* connection
159       = new Connection(name, fd_protocol_version, socket, authenticated);
160   if (!add(connection)) {
161     delete (connection);
162     return nullptr;
163   }
164   return connection;
165 }
166 
get_connection(const char * name)167 Connection* ConnectionPool::get_connection(const char* name)
168 {
169   Connection* connection = nullptr;
170   if (!name) { return nullptr; }
171   foreach_alist (connection, connections_) {
172     if (connection->check() && connection->authenticated()
173         && connection->bsock() && (!connection->in_use())
174         && bstrcmp(name, connection->name())) {
175       Dmsg1(120, "found connection from client %s\n", connection->name());
176       return connection;
177     }
178   }
179   return nullptr;
180 }
181 
get_connection(const char * name,timespec & timeout)182 Connection* ConnectionPool::get_connection(const char* name, timespec& timeout)
183 {
184   Connection* connection = nullptr;
185   int errstat = 0;
186 
187   if (!name) { return nullptr; }
188 
189   while ((!connection) && (errstat == 0)) {
190     connection = get_connection(name);
191     if (!connection) {
192       Dmsg0(120, "waiting for new connections.\n");
193       errstat = WaitForNewConnection(timeout);
194       if (errstat == ETIMEDOUT) {
195         Dmsg0(120, "timeout while waiting for new connections.\n");
196       }
197     }
198   }
199 
200   return connection;
201 }
202 
WaitForNewConnection(timespec & timeout)203 int ConnectionPool::WaitForNewConnection(timespec& timeout)
204 {
205   int errstat;
206 
207   P(add_mutex_);
208   errstat = pthread_cond_timedwait(&add_cond_var_, &add_mutex_, &timeout);
209   V(add_mutex_);
210   if (errstat == 0) {
211     Dmsg0(120, "new connection available.\n");
212   } else if (errstat == ETIMEDOUT) {
213     Dmsg0(120, "timeout.\n");
214   } else {
215     Emsg1(M_ERROR, 0, "error: %d\n", errstat);
216   }
217   return errstat;
218 }
219 
remove(Connection * connection)220 bool ConnectionPool::remove(Connection* connection)
221 {
222   bool removed = false;
223   for (int i = connections_->size() - 1; i >= 0; i--) {
224     if (connections_->get(i) == connection) {
225       connections_->remove(i);
226       removed = true;
227       Dmsg0(120, "removed connection.\n");
228       break;
229     }
230   }
231   return removed;
232 }
233 
remove(const char * name,int timeout_in_seconds)234 Connection* ConnectionPool::remove(const char* name, int timeout_in_seconds)
235 {
236   bool done = false;
237   Connection* result = nullptr;
238   Connection* connection = nullptr;
239   struct timespec timeout;
240 
241   ConvertTimeoutToTimespec(timeout, timeout_in_seconds);
242 
243   Dmsg2(120, "waiting for connection from client %s. Timeout: %ds.\n", name,
244         timeout_in_seconds);
245 
246   while (!done) {
247     connection = get_connection(name, timeout);
248     if (!connection) {
249       /*
250        * nullptr is returned only on timeout (or other internal errors).
251        */
252       return nullptr;
253     }
254     if (connection->take()) {
255       result = connection;
256       remove(connection);
257       done = true;
258     } else {
259       /*
260        * As we can not take it, we assume it is already taken by another thread.
261        * In any case, we remove it, to prevent to get stuck in this while loop.
262        */
263       remove(connection);
264     }
265   }
266   return result;
267 }
268