1 // Copyright (C) 2013-2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7 #include <config.h>
8 #include <d2/d2_queue_mgr.h>
9 #include <d2srv/d2_log.h>
10 #include <dhcp_ddns/ncr_udp.h>
11
12 namespace isc {
13 namespace d2 {
14
15 // Makes constant visible to Google test macros.
16 const size_t D2QueueMgr::MAX_QUEUE_DEFAULT;
17
D2QueueMgr(asiolink::IOServicePtr & io_service,const size_t max_queue_size)18 D2QueueMgr::D2QueueMgr(asiolink::IOServicePtr& io_service, const size_t max_queue_size)
19 : io_service_(io_service), max_queue_size_(max_queue_size),
20 mgr_state_(NOT_INITTED), target_stop_state_(NOT_INITTED) {
21 if (!io_service_) {
22 isc_throw(D2QueueMgrError, "IOServicePtr cannot be null");
23 }
24
25 // Use setter to do validation.
26 setMaxQueueSize(max_queue_size);
27 }
28
~D2QueueMgr()29 D2QueueMgr::~D2QueueMgr() {
30 }
31
32 void
operator ()(const dhcp_ddns::NameChangeListener::Result result,dhcp_ddns::NameChangeRequestPtr & ncr)33 D2QueueMgr::operator()(const dhcp_ddns::NameChangeListener::Result result,
34 dhcp_ddns::NameChangeRequestPtr& ncr) {
35 try {
36 // Note that error conditions must be handled here without throwing
37 // exceptions. Remember this is the application level "link" in the
38 // callback chain. Throwing an exception here will "break" the
39 // io_service "run" we are operating under. With that in mind,
40 // if we hit a problem, we will stop the listener transition to
41 // the appropriate stopped state. Upper layer(s) must monitor our
42 // state as well as our queue size.
43 switch (result) {
44 case dhcp_ddns::NameChangeListener::SUCCESS:
45 // Receive was successful, attempt to queue the request.
46 if (getQueueSize() < getMaxQueueSize()) {
47 // There's room on the queue, add to the end
48 enqueue(ncr);
49
50 // Log that we got the request
51 LOG_DEBUG(dhcp_to_d2_logger,
52 isc::log::DBGLVL_TRACE_DETAIL_DATA,
53 DHCP_DDNS_QUEUE_MGR_QUEUE_RECEIVE)
54 .arg(ncr->getRequestId());
55 return;
56 }
57
58 // Queue is full, stop the listener.
59 // Note that we can move straight to a STOPPED state as there
60 // is no receive in progress.
61 LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_QUEUE_FULL)
62 .arg(max_queue_size_);
63 stopListening(STOPPED_QUEUE_FULL);
64 break;
65
66 case dhcp_ddns::NameChangeListener::STOPPED:
67 if (mgr_state_ == STOPPING) {
68 // This is confirmation that the listener has stopped and its
69 // callback will not be called again, unless its restarted.
70 updateStopState();
71 } else {
72 // We should not get a receive complete status of stopped
73 // unless we canceled the read as part of stopping. Therefore
74 // this is unexpected so we will treat it as a receive error.
75 // This is most likely an unforeseen programmatic issue.
76 LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_UNEXPECTED_STOP)
77 .arg(mgr_state_);
78 stopListening(STOPPED_RECV_ERROR);
79 }
80
81 break;
82
83 default:
84 // Receive failed, stop the listener.
85 // Note that we can move straight to a STOPPED state as there
86 // is no receive in progress.
87 LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_RECV_ERROR);
88 stopListening(STOPPED_RECV_ERROR);
89 break;
90 }
91 } catch (const std::exception& ex) {
92 // On the outside chance a throw occurs, let's log it and swallow it.
93 LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_UNEXPECTED_HANDLER_ERROR)
94 .arg(ex.what());
95 }
96 }
97
98 void
initUDPListener(const isc::asiolink::IOAddress & ip_address,const uint32_t port,const dhcp_ddns::NameChangeFormat format,const bool reuse_address)99 D2QueueMgr::initUDPListener(const isc::asiolink::IOAddress& ip_address,
100 const uint32_t port,
101 const dhcp_ddns::NameChangeFormat format,
102 const bool reuse_address) {
103
104 if (listener_) {
105 isc_throw(D2QueueMgrError,
106 "D2QueueMgr listener is already initialized");
107 }
108
109 // Instantiate a UDP listener and set state to INITTED.
110 // Note UDP listener constructor does not throw.
111 listener_.reset(new dhcp_ddns::
112 NameChangeUDPListener(ip_address, port, format, *this,
113 reuse_address));
114 mgr_state_ = INITTED;
115 }
116
117 void
startListening()118 D2QueueMgr::startListening() {
119 // We can't listen if we haven't initialized the listener yet.
120 if (!listener_) {
121 isc_throw(D2QueueMgrError, "D2QueueMgr "
122 "listener is not initialized, cannot start listening");
123 }
124
125 // If we are already listening, we do not want to "reopen" the listener
126 // and really we shouldn't be trying.
127 if (mgr_state_ == RUNNING) {
128 isc_throw(D2QueueMgrError, "D2QueueMgr "
129 "cannot call startListening from the RUNNING state");
130 }
131
132 // Instruct the listener to start listening and set state accordingly.
133 try {
134 listener_->startListening(*io_service_);
135 mgr_state_ = RUNNING;
136 } catch (const isc::Exception& ex) {
137 isc_throw(D2QueueMgrError, "D2QueueMgr listener start failed: "
138 << ex.what());
139 }
140
141 LOG_DEBUG(d2_logger, isc::log::DBGLVL_START_SHUT,
142 DHCP_DDNS_QUEUE_MGR_STARTED);
143 }
144
145 void
stopListening(const State target_stop_state)146 D2QueueMgr::stopListening(const State target_stop_state) {
147 if (listener_) {
148 // Enforce only valid "stop" states.
149 // This is purely a programmatic error and should never happen.
150 if (target_stop_state != STOPPED &&
151 target_stop_state != STOPPED_QUEUE_FULL &&
152 target_stop_state != STOPPED_RECV_ERROR) {
153 isc_throw(D2QueueMgrError,
154 "D2QueueMgr invalid value for stop state: "
155 << target_stop_state);
156 }
157
158 // Remember the state we want to achieve.
159 target_stop_state_ = target_stop_state;
160
161 // Instruct the listener to stop. If the listener reports that it
162 // has IO pending, then we transition to STOPPING to wait for the
163 // cancellation event. Otherwise, we can move directly to the targeted
164 // state.
165 listener_->stopListening();
166 if (listener_->isIoPending()) {
167 mgr_state_ = STOPPING;
168 } else {
169 updateStopState();
170 }
171 }
172 }
173
174 void
updateStopState()175 D2QueueMgr::updateStopState() {
176 mgr_state_ = target_stop_state_;
177 LOG_DEBUG(d2_logger, isc::log::DBGLVL_TRACE_BASIC,
178 DHCP_DDNS_QUEUE_MGR_STOPPED);
179 }
180
181
182 void
removeListener()183 D2QueueMgr::removeListener() {
184 // Force our managing layer(s) to stop us properly first.
185 if (mgr_state_ == RUNNING) {
186 isc_throw(D2QueueMgrError,
187 "D2QueueMgr cannot delete listener while state is RUNNING");
188 }
189
190 listener_.reset();
191 mgr_state_ = NOT_INITTED;
192 }
193
194 const dhcp_ddns::NameChangeRequestPtr&
peek() const195 D2QueueMgr::peek() const {
196 if (getQueueSize() == 0) {
197 isc_throw(D2QueueMgrQueueEmpty,
198 "D2QueueMgr peek attempted on an empty queue");
199 }
200
201 return (ncr_queue_.front());
202 }
203
204 const dhcp_ddns::NameChangeRequestPtr&
peekAt(const size_t index) const205 D2QueueMgr::peekAt(const size_t index) const {
206 if (index >= getQueueSize()) {
207 isc_throw(D2QueueMgrInvalidIndex,
208 "D2QueueMgr peek beyond end of queue attempted"
209 << " index: " << index << " queue size: " << getQueueSize());
210 }
211
212 return (ncr_queue_.at(index));
213 }
214
215 void
dequeueAt(const size_t index)216 D2QueueMgr::dequeueAt(const size_t index) {
217 if (index >= getQueueSize()) {
218 isc_throw(D2QueueMgrInvalidIndex,
219 "D2QueueMgr dequeue beyond end of queue attempted"
220 << " index: " << index << " queue size: " << getQueueSize());
221 }
222
223 RequestQueue::iterator pos = ncr_queue_.begin() + index;
224 ncr_queue_.erase(pos);
225 }
226
227
228 void
dequeue()229 D2QueueMgr::dequeue() {
230 if (getQueueSize() == 0) {
231 isc_throw(D2QueueMgrQueueEmpty,
232 "D2QueueMgr dequeue attempted on an empty queue");
233 }
234
235 ncr_queue_.pop_front();
236 }
237
238 void
enqueue(dhcp_ddns::NameChangeRequestPtr & ncr)239 D2QueueMgr::enqueue(dhcp_ddns::NameChangeRequestPtr& ncr) {
240 ncr_queue_.push_back(ncr);
241 }
242
243 void
clearQueue()244 D2QueueMgr::clearQueue() {
245 ncr_queue_.clear();
246 }
247
248 void
setMaxQueueSize(const size_t new_queue_max)249 D2QueueMgr::setMaxQueueSize(const size_t new_queue_max) {
250 if (new_queue_max < 1) {
251 isc_throw(D2QueueMgrError,
252 "D2QueueMgr maximum queue size must be greater than zero");
253 }
254
255 if (new_queue_max < getQueueSize()) {
256 isc_throw(D2QueueMgrError, "D2QueueMgr maximum queue size value cannot"
257 " be less than the current queue size :" << getQueueSize());
258 }
259
260 max_queue_size_ = new_queue_max;
261 }
262
263 } // namespace isc::d2
264 } // namespace isc
265