1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/executor/connection_pool_test_fixture.h"
34 
35 #include "mongo/stdx/memory.h"
36 
37 namespace mongo {
38 namespace executor {
39 namespace connection_pool_test_details {
40 
TimerImpl(PoolImpl * global)41 TimerImpl::TimerImpl(PoolImpl* global) : _global(global) {}
42 
~TimerImpl()43 TimerImpl::~TimerImpl() {
44     cancelTimeout();
45 }
46 
setTimeout(Milliseconds timeout,TimeoutCallback cb)47 void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
48     _cb = std::move(cb);
49     _expiration = _global->now() + timeout;
50 
51     _timers.emplace(this);
52 }
53 
cancelTimeout()54 void TimerImpl::cancelTimeout() {
55     _timers.erase(this);
56 }
57 
clear()58 void TimerImpl::clear() {
59     _timers.clear();
60 }
61 
fireIfNecessary()62 void TimerImpl::fireIfNecessary() {
63     auto now = PoolImpl().now();
64 
65     auto timers = _timers;
66 
67     for (auto&& x : timers) {
68         if (_timers.count(x) && (x->_expiration <= now)) {
69             x->_cb();
70         }
71     }
72 }
73 
74 std::set<TimerImpl*> TimerImpl::_timers;
75 
ConnectionImpl(const HostAndPort & hostAndPort,size_t generation,PoolImpl * global)76 ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global)
77     : _hostAndPort(hostAndPort),
78       _timer(global),
79       _global(global),
80       _id(_idCounter++),
81       _generation(generation) {}
82 
indicateUsed()83 void ConnectionImpl::indicateUsed() {
84     _lastUsed = _global->now();
85 }
86 
indicateSuccess()87 void ConnectionImpl::indicateSuccess() {
88     _status = Status::OK();
89 }
90 
indicateFailure(Status status)91 void ConnectionImpl::indicateFailure(Status status) {
92     _status = std::move(status);
93 }
94 
resetToUnknown()95 void ConnectionImpl::resetToUnknown() {
96     _status = ConnectionPool::kConnectionStateUnknown;
97 }
98 
id() const99 size_t ConnectionImpl::id() const {
100     return _id;
101 }
102 
getHostAndPort() const103 const HostAndPort& ConnectionImpl::getHostAndPort() const {
104     return _hostAndPort;
105 }
106 
isHealthy()107 bool ConnectionImpl::isHealthy() {
108     return true;
109 }
110 
clear()111 void ConnectionImpl::clear() {
112     _setupQueue.clear();
113     _refreshQueue.clear();
114     _pushSetupQueue.clear();
115     _pushRefreshQueue.clear();
116 }
117 
pushSetup(PushSetupCallback status)118 void ConnectionImpl::pushSetup(PushSetupCallback status) {
119     _pushSetupQueue.push_back(status);
120 
121     if (_setupQueue.size()) {
122         auto connPtr = _setupQueue.front();
123         auto callback = _pushSetupQueue.front();
124         _setupQueue.pop_front();
125         _pushSetupQueue.pop_front();
126 
127         auto cb = connPtr->_setupCallback;
128         cb(connPtr, callback());
129     }
130 }
131 
pushSetup(Status status)132 void ConnectionImpl::pushSetup(Status status) {
133     pushSetup([status]() { return status; });
134 }
135 
setupQueueDepth()136 size_t ConnectionImpl::setupQueueDepth() {
137     return _setupQueue.size();
138 }
139 
pushRefresh(PushRefreshCallback status)140 void ConnectionImpl::pushRefresh(PushRefreshCallback status) {
141     _pushRefreshQueue.push_back(status);
142 
143     if (_refreshQueue.size()) {
144         auto connPtr = _refreshQueue.front();
145         auto callback = _pushRefreshQueue.front();
146 
147         _refreshQueue.pop_front();
148         _pushRefreshQueue.pop_front();
149 
150         auto cb = connPtr->_refreshCallback;
151         cb(connPtr, callback());
152     }
153 }
154 
pushRefresh(Status status)155 void ConnectionImpl::pushRefresh(Status status) {
156     pushRefresh([status]() { return status; });
157 }
158 
refreshQueueDepth()159 size_t ConnectionImpl::refreshQueueDepth() {
160     return _refreshQueue.size();
161 }
162 
getLastUsed() const163 Date_t ConnectionImpl::getLastUsed() const {
164     return _lastUsed;
165 }
166 
getStatus() const167 const Status& ConnectionImpl::getStatus() const {
168     return _status;
169 }
170 
setTimeout(Milliseconds timeout,TimeoutCallback cb)171 void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
172     _timer.setTimeout(timeout, cb);
173 }
174 
cancelTimeout()175 void ConnectionImpl::cancelTimeout() {
176     _timer.cancelTimeout();
177 }
178 
setup(Milliseconds timeout,SetupCallback cb)179 void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) {
180     _setupCallback = std::move(cb);
181 
182     _timer.setTimeout(timeout, [this] {
183         _setupCallback(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout"));
184     });
185 
186     _setupQueue.push_back(this);
187 
188     if (_pushSetupQueue.size()) {
189         auto connPtr = _setupQueue.front();
190         auto callback = _pushSetupQueue.front();
191         _setupQueue.pop_front();
192         _pushSetupQueue.pop_front();
193 
194         auto refreshCb = connPtr->_setupCallback;
195         refreshCb(connPtr, callback());
196     }
197 }
198 
refresh(Milliseconds timeout,RefreshCallback cb)199 void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) {
200     _refreshCallback = std::move(cb);
201 
202     _timer.setTimeout(timeout, [this] {
203         _refreshCallback(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout"));
204     });
205 
206     _refreshQueue.push_back(this);
207 
208     if (_pushRefreshQueue.size()) {
209         auto connPtr = _refreshQueue.front();
210         auto callback = _pushRefreshQueue.front();
211 
212         _refreshQueue.pop_front();
213         _pushRefreshQueue.pop_front();
214 
215         auto refreshCb = connPtr->_refreshCallback;
216         refreshCb(connPtr, callback());
217     }
218 }
219 
getGeneration() const220 size_t ConnectionImpl::getGeneration() const {
221     return _generation;
222 }
223 
224 std::deque<ConnectionImpl::PushSetupCallback> ConnectionImpl::_pushSetupQueue;
225 std::deque<ConnectionImpl::PushRefreshCallback> ConnectionImpl::_pushRefreshQueue;
226 std::deque<ConnectionImpl*> ConnectionImpl::_setupQueue;
227 std::deque<ConnectionImpl*> ConnectionImpl::_refreshQueue;
228 size_t ConnectionImpl::_idCounter = 1;
229 
makeConnection(const HostAndPort & hostAndPort,size_t generation)230 std::unique_ptr<ConnectionPool::ConnectionInterface> PoolImpl::makeConnection(
231     const HostAndPort& hostAndPort, size_t generation) {
232     return stdx::make_unique<ConnectionImpl>(hostAndPort, generation, this);
233 }
234 
makeTimer()235 std::unique_ptr<ConnectionPool::TimerInterface> PoolImpl::makeTimer() {
236     return stdx::make_unique<TimerImpl>(this);
237 }
238 
now()239 Date_t PoolImpl::now() {
240     return _now.get_value_or(Date_t::now());
241 }
242 
setNow(Date_t now)243 void PoolImpl::setNow(Date_t now) {
244     _now = now;
245     TimerImpl::fireIfNecessary();
246 }
247 
248 boost::optional<Date_t> PoolImpl::_now;
249 
250 }  // namespace connection_pool_test_details
251 }  // namespace executor
252 }  // namespace mongo
253