1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/executor/connection_pool_test_fixture.h"
34
35 #include "mongo/stdx/memory.h"
36
37 namespace mongo {
38 namespace executor {
39 namespace connection_pool_test_details {
40
TimerImpl(PoolImpl * global)41 TimerImpl::TimerImpl(PoolImpl* global) : _global(global) {}
42
~TimerImpl()43 TimerImpl::~TimerImpl() {
44 cancelTimeout();
45 }
46
setTimeout(Milliseconds timeout,TimeoutCallback cb)47 void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
48 _cb = std::move(cb);
49 _expiration = _global->now() + timeout;
50
51 _timers.emplace(this);
52 }
53
cancelTimeout()54 void TimerImpl::cancelTimeout() {
55 _timers.erase(this);
56 }
57
clear()58 void TimerImpl::clear() {
59 _timers.clear();
60 }
61
fireIfNecessary()62 void TimerImpl::fireIfNecessary() {
63 auto now = PoolImpl().now();
64
65 auto timers = _timers;
66
67 for (auto&& x : timers) {
68 if (_timers.count(x) && (x->_expiration <= now)) {
69 x->_cb();
70 }
71 }
72 }
73
74 std::set<TimerImpl*> TimerImpl::_timers;
75
ConnectionImpl(const HostAndPort & hostAndPort,size_t generation,PoolImpl * global)76 ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global)
77 : _hostAndPort(hostAndPort),
78 _timer(global),
79 _global(global),
80 _id(_idCounter++),
81 _generation(generation) {}
82
indicateUsed()83 void ConnectionImpl::indicateUsed() {
84 _lastUsed = _global->now();
85 }
86
indicateSuccess()87 void ConnectionImpl::indicateSuccess() {
88 _status = Status::OK();
89 }
90
indicateFailure(Status status)91 void ConnectionImpl::indicateFailure(Status status) {
92 _status = std::move(status);
93 }
94
resetToUnknown()95 void ConnectionImpl::resetToUnknown() {
96 _status = ConnectionPool::kConnectionStateUnknown;
97 }
98
id() const99 size_t ConnectionImpl::id() const {
100 return _id;
101 }
102
getHostAndPort() const103 const HostAndPort& ConnectionImpl::getHostAndPort() const {
104 return _hostAndPort;
105 }
106
isHealthy()107 bool ConnectionImpl::isHealthy() {
108 return true;
109 }
110
clear()111 void ConnectionImpl::clear() {
112 _setupQueue.clear();
113 _refreshQueue.clear();
114 _pushSetupQueue.clear();
115 _pushRefreshQueue.clear();
116 }
117
pushSetup(PushSetupCallback status)118 void ConnectionImpl::pushSetup(PushSetupCallback status) {
119 _pushSetupQueue.push_back(status);
120
121 if (_setupQueue.size()) {
122 auto connPtr = _setupQueue.front();
123 auto callback = _pushSetupQueue.front();
124 _setupQueue.pop_front();
125 _pushSetupQueue.pop_front();
126
127 auto cb = connPtr->_setupCallback;
128 cb(connPtr, callback());
129 }
130 }
131
pushSetup(Status status)132 void ConnectionImpl::pushSetup(Status status) {
133 pushSetup([status]() { return status; });
134 }
135
setupQueueDepth()136 size_t ConnectionImpl::setupQueueDepth() {
137 return _setupQueue.size();
138 }
139
pushRefresh(PushRefreshCallback status)140 void ConnectionImpl::pushRefresh(PushRefreshCallback status) {
141 _pushRefreshQueue.push_back(status);
142
143 if (_refreshQueue.size()) {
144 auto connPtr = _refreshQueue.front();
145 auto callback = _pushRefreshQueue.front();
146
147 _refreshQueue.pop_front();
148 _pushRefreshQueue.pop_front();
149
150 auto cb = connPtr->_refreshCallback;
151 cb(connPtr, callback());
152 }
153 }
154
pushRefresh(Status status)155 void ConnectionImpl::pushRefresh(Status status) {
156 pushRefresh([status]() { return status; });
157 }
158
refreshQueueDepth()159 size_t ConnectionImpl::refreshQueueDepth() {
160 return _refreshQueue.size();
161 }
162
getLastUsed() const163 Date_t ConnectionImpl::getLastUsed() const {
164 return _lastUsed;
165 }
166
getStatus() const167 const Status& ConnectionImpl::getStatus() const {
168 return _status;
169 }
170
setTimeout(Milliseconds timeout,TimeoutCallback cb)171 void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
172 _timer.setTimeout(timeout, cb);
173 }
174
cancelTimeout()175 void ConnectionImpl::cancelTimeout() {
176 _timer.cancelTimeout();
177 }
178
setup(Milliseconds timeout,SetupCallback cb)179 void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) {
180 _setupCallback = std::move(cb);
181
182 _timer.setTimeout(timeout, [this] {
183 _setupCallback(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout"));
184 });
185
186 _setupQueue.push_back(this);
187
188 if (_pushSetupQueue.size()) {
189 auto connPtr = _setupQueue.front();
190 auto callback = _pushSetupQueue.front();
191 _setupQueue.pop_front();
192 _pushSetupQueue.pop_front();
193
194 auto refreshCb = connPtr->_setupCallback;
195 refreshCb(connPtr, callback());
196 }
197 }
198
refresh(Milliseconds timeout,RefreshCallback cb)199 void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) {
200 _refreshCallback = std::move(cb);
201
202 _timer.setTimeout(timeout, [this] {
203 _refreshCallback(this, Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timeout"));
204 });
205
206 _refreshQueue.push_back(this);
207
208 if (_pushRefreshQueue.size()) {
209 auto connPtr = _refreshQueue.front();
210 auto callback = _pushRefreshQueue.front();
211
212 _refreshQueue.pop_front();
213 _pushRefreshQueue.pop_front();
214
215 auto refreshCb = connPtr->_refreshCallback;
216 refreshCb(connPtr, callback());
217 }
218 }
219
getGeneration() const220 size_t ConnectionImpl::getGeneration() const {
221 return _generation;
222 }
223
224 std::deque<ConnectionImpl::PushSetupCallback> ConnectionImpl::_pushSetupQueue;
225 std::deque<ConnectionImpl::PushRefreshCallback> ConnectionImpl::_pushRefreshQueue;
226 std::deque<ConnectionImpl*> ConnectionImpl::_setupQueue;
227 std::deque<ConnectionImpl*> ConnectionImpl::_refreshQueue;
228 size_t ConnectionImpl::_idCounter = 1;
229
makeConnection(const HostAndPort & hostAndPort,size_t generation)230 std::unique_ptr<ConnectionPool::ConnectionInterface> PoolImpl::makeConnection(
231 const HostAndPort& hostAndPort, size_t generation) {
232 return stdx::make_unique<ConnectionImpl>(hostAndPort, generation, this);
233 }
234
makeTimer()235 std::unique_ptr<ConnectionPool::TimerInterface> PoolImpl::makeTimer() {
236 return stdx::make_unique<TimerImpl>(this);
237 }
238
now()239 Date_t PoolImpl::now() {
240 return _now.get_value_or(Date_t::now());
241 }
242
setNow(Date_t now)243 void PoolImpl::setNow(Date_t now) {
244 _now = now;
245 TimerImpl::fireIfNecessary();
246 }
247
248 boost::optional<Date_t> PoolImpl::_now;
249
250 } // namespace connection_pool_test_details
251 } // namespace executor
252 } // namespace mongo
253