1 // Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7 #include <config.h>
8
9 #include <asiolink/asio_wrapper.h>
10 #include <asiolink/interval_timer.h>
11 #include <cc/data.h>
12 #include <http/client.h>
13 #include <http/listener.h>
14 #include <http/post_request_json.h>
15 #include <http/response_creator.h>
16 #include <http/response_creator_factory.h>
17 #include <http/response_json.h>
18 #include <http/url.h>
19 #include <util/multi_threading_mgr.h>
20 #include <testutils/gtest_utils.h>
21
22 #include <boost/asio/ip/tcp.hpp>
23 #include <boost/pointer_cast.hpp>
24 #include <gtest/gtest.h>
25
26 #include <functional>
27 #include <sstream>
28 #include <string>
29
30 using namespace isc;
31 using namespace isc::asiolink;
32 using namespace isc::data;
33 using namespace isc::http;
34 using namespace isc::util;
35 namespace ph = std::placeholders;
36
37 namespace {
38
39 /// @brief IP address to which HTTP service is bound.
40 const std::string SERVER_ADDRESS = "127.0.0.1";
41
42 /// @brief Port number to which HTTP service is bound.
43 const unsigned short SERVER_PORT = 18123;
44
45 /// @brief Test timeout (ms).
46 const long TEST_TIMEOUT = 10000;
47
48 /// @brief Container request/response pair handled by a given thread.
49 struct ClientRR {
50 /// @brief Thread id of the client thread handling the request as a string.
51 std::string thread_id_;
52
53 /// @brief HTTP request submitted by the client thread.
54 PostHttpRequestJsonPtr request_;
55
56 /// @brief HTTP response received by the client thread.
57 HttpResponseJsonPtr response_;
58 };
59
60 /// @brief Pointer to a ClientRR instance.
61 typedef boost::shared_ptr<ClientRR> ClientRRPtr;
62
63 /// @brief Implementation of the @ref HttpResponseCreator.
64 ///
65 /// Creates a response to a request containing body content
66 /// as follows:
67 ///
68 /// ```
69 /// { "sequence" : nnnn }
70 /// ```
71 ///
72 /// The response will include the sequence number of the request
73 /// as well as the server port passed into the creator's constructor:
74 ///
75 /// ```
76 /// { "sequence": nnnn, "server-port": xxxx }
77 /// ```
78 class TestHttpResponseCreator : public HttpResponseCreator {
79 public:
80 /// @brief Constructor
81 ///
82 /// @param server_port integer value the server listens upon, it is
83 /// echoed back in responses as "server-port".
TestHttpResponseCreator(uint16_t server_port)84 TestHttpResponseCreator(uint16_t server_port)
85 : server_port_(server_port) { }
86
87 /// @brief Create a new request.
88 ///
89 /// @return Pointer to the new instance of the @ref HttpRequest.
90 virtual HttpRequestPtr
createNewHttpRequest() const91 createNewHttpRequest() const {
92 return (HttpRequestPtr(new PostHttpRequestJson()));
93 }
94
95 private:
96 /// @brief Creates HTTP response.
97 ///
98 /// @param request Pointer to the HTTP request.
99 /// @param status_code status code to include in the response.
100 ///
101 /// @return Pointer to the generated HTTP response.
102 virtual HttpResponsePtr
createStockHttpResponse(const HttpRequestPtr & request,const HttpStatusCode & status_code) const103 createStockHttpResponse(const HttpRequestPtr& request,
104 const HttpStatusCode& status_code) const {
105 // The request hasn't been finalized so the request object
106 // doesn't contain any information about the HTTP version number
107 // used. But, the context should have this data (assuming the
108 // HTTP version is parsed OK).
109 HttpVersion http_version(request->context()->http_version_major_,
110 request->context()->http_version_minor_);
111 // This will generate the response holding JSON content.
112 HttpResponseJsonPtr response(new HttpResponseJson(http_version, status_code));
113 response->finalize();
114 return (response);
115 }
116
117 /// @brief Creates HTTP response.
118 ///
119 /// Generates a response which echoes the requests sequence
120 /// number as well as the creator's server port value. Responses
121 /// should appear as follows:
122 ///
123 /// ```
124 /// { "sequence" : nnnn }
125 /// ```
126 ///
127 /// @param request Pointer to the HTTP request.
128 /// @return Pointer to the generated HTTP OK response with no content.
129 virtual HttpResponsePtr
createDynamicHttpResponse(HttpRequestPtr request)130 createDynamicHttpResponse(HttpRequestPtr request) {
131 // Request must always be JSON.
132 PostHttpRequestJsonPtr request_json =
133 boost::dynamic_pointer_cast<PostHttpRequestJson>(request);
134 if (!request_json) {
135 return (createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
136 }
137
138 // Extract the sequence from the request.
139 ConstElementPtr sequence = request_json->getJsonElement("sequence");
140 if (!sequence) {
141 return (createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
142 }
143
144 // Create the response.
145 HttpResponseJsonPtr response(new HttpResponseJson(request->getHttpVersion(),
146 HttpStatusCode::OK));
147 // Construct the body.
148 ElementPtr body = Element::createMap();
149 body->set("server-port", Element::create(server_port_));
150 body->set("sequence", sequence);
151
152 // Echo request body back in the response.
153 response->setBodyAsJson(body);
154
155 response->finalize();
156 return (response);
157 }
158
159 /// @brief Port upon which this creator's server is listening.
160 ///
161 /// The intent is to use the value to determine which server generated
162 /// a given response.
163 uint16_t server_port_;
164 };
165
166 /// @brief Implementation of the test @ref HttpResponseCreatorFactory.
167 ///
168 /// This factory class creates @ref TestHttpResponseCreator instances.
169 class TestHttpResponseCreatorFactory : public HttpResponseCreatorFactory {
170 public:
171
172 /// @brief Constructor
173 ///
174 /// @param server_port port upon with the server is listening. This
175 /// value will be included in responses such that each response
176 /// can be attributed to a specific server.
TestHttpResponseCreatorFactory(uint16_t server_port)177 TestHttpResponseCreatorFactory(uint16_t server_port)
178 : server_port_(server_port) {};
179
180 /// @brief Creates @ref TestHttpResponseCreator instance.
create() const181 virtual HttpResponseCreatorPtr create() const {
182 HttpResponseCreatorPtr response_creator(new TestHttpResponseCreator(server_port_));
183 return (response_creator);
184 }
185
186 /// @brief Port upon which this factory's server is listening.
187 ///
188 /// The intent is to use the value to determine which server generated
189 /// a given response.
190 uint16_t server_port_;
191 };
192
193 /// @brief Test fixture class for testing threading modes of HTTP client.
194 class MultiThreadingHttpClientTest : public ::testing::Test {
195 public:
196
197 /// @brief Constructor.
MultiThreadingHttpClientTest()198 MultiThreadingHttpClientTest()
199 : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
200 test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
201 expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false),
202 pause_cnt_(0) {
203 test_timer_.setup(std::bind(&MultiThreadingHttpClientTest::timeoutHandler, this, true),
204 TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
205 MultiThreadingMgr::instance().setMode(true);
206 }
207
208 /// @brief Destructor.
~MultiThreadingHttpClientTest()209 ~MultiThreadingHttpClientTest() {
210 // Stop the client.
211 if (client_) {
212 client_->stop();
213 }
214
215 // Stop all listeners.
216 for (const auto& listener : listeners_) {
217 listener->stop();
218 }
219
220 MultiThreadingMgr::instance().setMode(false);
221 }
222
223 /// @brief Callback function to invoke upon test timeout.
224 ///
225 /// It stops the IO service and reports test timeout.
226 ///
227 /// @param fail_on_timeout Specifies if test failure should be reported.
timeoutHandler(const bool fail_on_timeout)228 void timeoutHandler(const bool fail_on_timeout) {
229 if (fail_on_timeout) {
230 ADD_FAILURE() << "Timeout occurred while running the test!";
231 }
232 io_service_.stop();
233 }
234
235 /// @brief Runs the test's IOService until the desired number of requests
236 /// have been carried out or the test fails.
runIOService(size_t request_limit)237 void runIOService(size_t request_limit) {
238 while (getRRCount() < request_limit) {
239 // Always call reset() before we call run();
240 io_service_.restart();
241
242 // Run until a client stops the service.
243 io_service_.run();
244 }
245 }
246
247 /// @brief Creates an HTTP request with JSON body.
248 ///
249 /// It includes a JSON parameter with a specified value.
250 ///
251 /// @param parameter_name JSON parameter to be included.
252 /// @param value JSON parameter value.
253 /// @param version HTTP version to be used. Default is HTTP/1.1.
254 template<typename ValueType>
createRequest(const std::string & parameter_name,const ValueType & value,const HttpVersion & version=HttpVersion (1,1))255 PostHttpRequestJsonPtr createRequest(const std::string& parameter_name,
256 const ValueType& value,
257 const HttpVersion& version = HttpVersion(1, 1)) {
258 // Create POST request with JSON body.
259 PostHttpRequestJsonPtr request(new PostHttpRequestJson(HttpRequest::Method::HTTP_POST,
260 "/boo", version));
261 // Body is a map with a specified parameter included.
262 ElementPtr body = Element::createMap();
263 body->set(parameter_name, Element::create(value));
264 request->setBodyAsJson(body);
265 try {
266 request->finalize();
267 } catch (const std::exception& ex) {
268 ADD_FAILURE() << "failed to create request: " << ex.what();
269 }
270
271 return (request);
272 }
273
274 /// @brief Test that worker threads are not permitted to change thread pool
275 /// state.
testIllegalThreadPoolActions()276 void testIllegalThreadPoolActions() {
277 ASSERT_THROW(client_->start(), MultiThreadingInvalidOperation);
278 ASSERT_THROW(client_->pause(), MultiThreadingInvalidOperation);
279 ASSERT_THROW(client_->resume(), MultiThreadingInvalidOperation);
280 }
281
282 /// @brief Initiates a single HTTP request.
283 ///
284 /// Constructs an HTTP post whose body is a JSON map containing a
285 /// single integer element, "sequence".
286 ///
287 /// The request completion handler will block each requesting thread
288 /// until the number of in-progress threads reaches the number of
289 /// threads in the pool. At that point, the handler will unblock
290 /// until all threads have finished preparing their response and are
291 /// ready to return. The handler will then notify all pending threads
292 /// and invoke stop() on the test's main IO service thread.
293 ///
294 /// @param sequence value for the integer element, "sequence",
295 /// to send in the request.
startRequest(int sequence,int port_offset=0)296 void startRequest(int sequence, int port_offset = 0) {
297 // Create the URL on which the server can be reached.
298 std::stringstream ss;
299 ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset);
300 Url url(ss.str());
301
302 // Initiate request to the server.
303 PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
304 HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
305 ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
306 request_json, response_json,
307 [this, request_json, response_json](const boost::system::error_code& ec,
308 const HttpResponsePtr&,
309 const std::string&) {
310 // Bail on an error.
311 ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
312
313 // Wait here until we have as many in progress as we have threads.
314 {
315 std::unique_lock<std::mutex> lck(test_mutex_);
316 ++num_in_progress_;
317 if (num_threads_ == 0 || num_in_progress_ == num_threads_) {
318 // Everybody has one, let's go.
319 num_finished_ = 0;
320 test_cv_.notify_all();
321 } else {
322 // I'm ready but others aren't wait here.
323 bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
324 [&]() { return (num_in_progress_ == num_threads_); });
325 if (!ret) {
326 ADD_FAILURE() << "clients failed to start work";
327 }
328 }
329 }
330
331 // If running on multiple threads, threads should be prohibited from
332 // changing the thread pool state.
333 if (num_threads_) {
334 testIllegalThreadPoolActions();
335 }
336
337 // Get stringified thread-id.
338 std::stringstream ss;
339 ss << std::this_thread::get_id();
340
341 // Create the ClientRR.
342 ClientRRPtr clientRR(new ClientRR());
343 clientRR->thread_id_ = ss.str();
344 clientRR->request_ = request_json;
345 clientRR->response_ = response_json;
346
347 // Wait here until we have as many ready to finish as we have threads.
348 {
349 std::unique_lock<std::mutex> lck(test_mutex_);
350 ++num_finished_;
351 clientRRs_.push_back(clientRR);
352 if (num_threads_ == 0 || num_finished_ == num_threads_) {
353 // We're all done, notify the others and finish.
354 num_in_progress_ = 0;
355 test_cv_.notify_all();
356 // Stop the test's IOService.
357 io_service_.stop();
358 } else {
359 // I'm done but others aren't wait here.
360 bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
361 [&]() { return (num_finished_ == num_threads_); });
362 if (!ret) {
363 ADD_FAILURE() << "clients failed to finish work";
364 }
365 }
366 }
367 }));
368 }
369
370 /// @brief Initiates a single HTTP request.
371 ///
372 /// Constructs an HTTP post whose body is a JSON map containing a
373 /// single integer element, "sequence".
374 ///
375 /// The request completion handler simply constructs the response,
376 /// and adds it the list of completed request/responses. If the
377 /// number of completed requests has reached the expected number
378 /// it stops the test IOService.
379 ///
380 /// @param sequence value for the integer element, "sequence",
381 /// to send in the request.
startRequestSimple(int sequence,int port_offset=0)382 void startRequestSimple(int sequence, int port_offset = 0) {
383 // Create the URL on which the server can be reached.
384 std::stringstream ss;
385 ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset);
386 Url url(ss.str());
387
388 // Initiate request to the server.
389 PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
390 HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
391 ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
392 request_json, response_json,
393 [this, request_json, response_json](const boost::system::error_code& ec,
394 const HttpResponsePtr&,
395 const std::string&) {
396 // Bail on an error.
397 ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
398
399 // Get stringified thread-id.
400 std::stringstream ss;
401 ss << std::this_thread::get_id();
402
403 // Create the ClientRR.
404 ClientRRPtr clientRR(new ClientRR());
405 clientRR->thread_id_ = ss.str();
406 clientRR->request_ = request_json;
407 clientRR->response_ = response_json;
408
409 {
410 std::unique_lock<std::mutex> lck(test_mutex_);
411 clientRRs_.push_back(clientRR);
412 ++num_finished_;
413 if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) {
414 io_service_.stop();
415 }
416 }
417
418 }));
419 }
420
421 /// @brief Carries out HTTP requests via HttpClient to HTTP listener(s).
422 ///
423 /// This function creates one HttpClient with the given number
424 /// of threads and then the given number of HttpListeners. It then
425 /// initiates the given number of request batches where each batch
426 /// contains one request per thread per listener.
427 ///
428 /// Then it iteratively runs the test's IOService until all
429 /// the requests have been responded to, an error occurs, or the
430 /// test times out.
431 ///
432 /// Each request carries a single integer element, "sequence", which
433 /// uniquely identifies the request. Each response is expected to
434 /// contain this value echoed back along with the listener's server
435 /// port number. Thus each response can be matched to it's request
436 /// and to the listener that handled the request.
437 ///
438 /// After all requests have been conducted, the function verifies
439 /// that:
440 ///
441 /// 1. The number of requests conducted is correct
442 /// 2. The sequence numbers in request-response pairs match
443 /// 3. Each client thread handled the same number of requests
444 /// 4. Each listener handled the same number of requests
445 ///
446 /// @param num_threads number of threads the HttpClient should use.
447 /// A value of 0 puts the HttpClient in single-threaded mode.
448 /// @param num_batches number of batches of requests that should be
449 /// conducted.
450 /// @param num_listeners number of HttpListeners to create. Defaults
451 /// to 1.
threadRequestAndReceive(size_t num_threads,size_t num_batches,size_t num_listeners=1)452 void threadRequestAndReceive(size_t num_threads, size_t num_batches,
453 size_t num_listeners = 1) {
454 ASSERT_TRUE(num_batches);
455 ASSERT_TRUE(num_listeners);
456 num_threads_ = num_threads;
457 num_batches_ = num_batches;
458 num_listeners_ = num_listeners;
459
460 // Client in ST is, in effect, 1 thread.
461 size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_);
462
463 // Calculate the expected number of requests.
464 expected_requests_ = (num_batches_ * num_listeners_ * effective_threads);
465
466 for (auto i = 0; i < num_listeners_; ++i) {
467 // Make a factory
468 HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
469 factories_.push_back(factory);
470
471 // Need to create a Listener on
472 HttpListenerPtr listener(new HttpListener(io_service_,
473 IOAddress(SERVER_ADDRESS), (SERVER_PORT + i),
474 TlsContextPtr(), factory,
475 HttpListener::RequestTimeout(10000),
476 HttpListener::IdleTimeout(10000)));
477 listeners_.push_back(listener);
478
479 // Start the server.
480 ASSERT_NO_THROW(listener->start());
481 }
482
483 // Create an MT client with num_threads
484 ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
485 ASSERT_TRUE(client_);
486
487 if (num_threads_ == 0) {
488 // If we single-threaded client should not have it's own IOService.
489 ASSERT_FALSE(client_->getThreadIOService());
490 } else {
491 // If we multi-threaded client should have it's own IOService.
492 ASSERT_TRUE(client_->getThreadIOService());
493 }
494
495 // Verify the pool size and number of threads are as expected.
496 ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
497 ASSERT_EQ(client_->getThreadCount(), num_threads);
498
499 // Start the requisite number of requests:
500 // batch * listeners * threads.
501 int sequence = 0;
502 for (auto b = 0; b < num_batches; ++b) {
503 for (auto l = 0; l < num_listeners_; ++l) {
504 for (auto t = 0; t < effective_threads; ++t) {
505 startRequest(++sequence, l);
506 }
507 }
508 }
509
510 // Loop until the clients are done, an error occurs, or the time runs out.
511 runIOService(expected_requests_);
512
513 // Client should stop without issue.
514 ASSERT_NO_THROW(client_->stop());
515
516 // Listeners should stop without issue.
517 for (const auto& listener : listeners_) {
518 ASSERT_NO_THROW(listener->stop());
519 }
520
521 // We should have a response for each request.
522 ASSERT_EQ(getRRCount(), expected_requests_);
523
524 // Create a map to track number of responses for each client thread.
525 std::map<std::string, int> responses_per_thread;
526
527 // Create a map to track number of responses for each listener port.
528 std::map<uint16_t, int> responses_per_listener;
529
530 // Get the stringified thread-id of the test's main thread.
531 std::stringstream ss;
532 ss << std::this_thread::get_id();
533 std::string main_thread_id = ss.str();
534
535 // Iterate over the client request/response pairs.
536 for (auto const& clientRR : clientRRs_) {
537 // Make sure it's whole.
538 ASSERT_FALSE(clientRR->thread_id_.empty());
539 ASSERT_TRUE(clientRR->request_);
540 ASSERT_TRUE(clientRR->response_);
541
542 // Request should contain an integer sequence number.
543 int request_sequence;
544 ConstElementPtr sequence = clientRR->request_->getJsonElement("sequence");
545 ASSERT_TRUE(sequence);
546 ASSERT_NO_THROW(request_sequence = sequence->intValue());
547
548 // Response should contain an integer sequence number.
549 int response_sequence;
550 sequence = clientRR->response_->getJsonElement("sequence");
551 ASSERT_TRUE(sequence);
552 ASSERT_NO_THROW(response_sequence = sequence->intValue());
553
554 // Request and Response sequence numbers should match.
555 ASSERT_EQ(request_sequence, response_sequence);
556
557 ConstElementPtr server_port_elem = clientRR->response_->getJsonElement("server-port");
558 ASSERT_TRUE(server_port_elem);
559 uint16_t server_port = server_port_elem->intValue();
560
561 if (num_threads_ == 0) {
562 // For ST mode thread id should always be the main thread.
563 ASSERT_EQ(clientRR->thread_id_, main_thread_id);
564 } else {
565 // For MT mode the thread id should never be the main thread.
566 ASSERT_NE(clientRR->thread_id_, main_thread_id);
567 }
568
569 // Bump the response count for the given client thread-id.
570 auto rit = responses_per_thread.find(clientRR->thread_id_);
571 if (rit != responses_per_thread.end()) {
572 responses_per_thread[clientRR->thread_id_] = rit->second + 1;
573 } else {
574 responses_per_thread[clientRR->thread_id_] = 1;
575 }
576
577 // Bump the response count for the given server port.
578 auto lit = responses_per_listener.find(server_port);
579 if (lit != responses_per_listener.end()) {
580 responses_per_listener[server_port] = lit->second + 1;
581 } else {
582 responses_per_listener[server_port] = 1;
583 }
584 }
585
586 // Make sure that all client threads received responses.
587 ASSERT_EQ(responses_per_thread.size(), effective_threads);
588
589 // Make sure that each client thread received the same number of responses.
590 for (auto const& it : responses_per_thread) {
591 EXPECT_EQ(it.second, (num_batches_ * num_listeners_))
592 << "thread-id: " << it.first
593 << ", responses: " << it.second << std::endl;
594 }
595
596 // Make sure that all listeners generated responses.
597 ASSERT_EQ(responses_per_listener.size(), num_listeners_);
598
599 // Make sure Each listener generated the same number of responses.
600 for (auto const& it : responses_per_listener) {
601 EXPECT_EQ(it.second, (num_batches_ * effective_threads))
602 << "server-port: " << it.first
603 << ", responses: " << it.second << std::endl;
604 }
605 }
606
607 /// @brief Verifies the client can be paused and resumed repeatedly
608 /// while doing multi-threaded work.
609 ///
610 /// @param num_threads number of threads the HttpClient should use.
611 /// Must be greater than zero, this test does not make sense for a
612 /// single threaded client.
613 /// @param num_batches number of batches of requests that should be
614 /// conducted.
615 /// @param num_listeners number of HttpListeners to create.
616 /// @param num_pauses number of pauses to conduct.
workPauseResumeShutdown(size_t num_threads,size_t num_batches,size_t num_listeners,size_t num_pauses)617 void workPauseResumeShutdown(size_t num_threads, size_t num_batches,
618 size_t num_listeners, size_t num_pauses) {
619 ASSERT_TRUE(num_threads);
620 ASSERT_TRUE(num_batches);
621 ASSERT_TRUE(num_listeners);
622 num_threads_ = num_threads;
623 num_batches_ = num_batches;
624 num_listeners_ = num_listeners;
625
626 // Calculate the total expected number of requests.
627 size_t total_requests = (num_batches_ * num_listeners_ * num_threads_);
628
629 // Create the listeners.
630 for (auto i = 0; i < num_listeners_; ++i) {
631 // Make a factory
632 HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
633 factories_.push_back(factory);
634
635 // Need to create a Listener on
636 HttpListenerPtr listener(new HttpListener(io_service_,
637 IOAddress(SERVER_ADDRESS), (SERVER_PORT + i),
638 TlsContextPtr(), factory,
639 HttpListener::RequestTimeout(10000),
640 HttpListener::IdleTimeout(10000)));
641 listeners_.push_back(listener);
642
643 // Start the server.
644 ASSERT_NO_THROW(listener->start());
645 }
646
647 // Create an instant start, MT client with num_threads
648 ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
649 ASSERT_TRUE(client_);
650
651 // Client should be running. Check convenience functions.
652 ASSERT_TRUE(client_->isRunning());
653 ASSERT_FALSE(client_->isPaused());
654 ASSERT_FALSE(client_->isStopped());
655
656 // Verify the pool size and number of threads are as expected.
657 ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
658 ASSERT_EQ(client_->getThreadCount(), num_threads);
659
660 // Start the requisite number of requests:
661 // batch * listeners * threads.
662 int sequence = 0;
663 for (auto b = 0; b < num_batches; ++b) {
664 for (auto l = 0; l < num_listeners_; ++l) {
665 for (auto t = 0; t < num_threads_; ++t) {
666 startRequestSimple(++sequence, l);
667 }
668 }
669 }
670
671 size_t rr_count = 0;
672 while (rr_count < total_requests) {
673 size_t request_limit = (pause_cnt_ < num_pauses ?
674 (rr_count + ((total_requests - rr_count) / num_pauses))
675 : total_requests);
676
677 // Run test IOService until we hit the limit.
678 runIOService(request_limit);
679
680 // If we've done all our pauses we should be through.
681 if (pause_cnt_ == num_pauses) {
682 break;
683 }
684
685 // Pause the client.
686 ASSERT_NO_THROW(client_->pause());
687 ASSERT_TRUE(client_->isPaused());
688 ++pause_cnt_;
689
690 // Check our progress.
691 rr_count = getRRCount();
692 ASSERT_GE(rr_count, request_limit);
693
694 // Resume the client.
695 ASSERT_NO_THROW(client_->resume());
696 ASSERT_TRUE(client_->isRunning());
697 }
698
699 // Client should stop without issue.
700 ASSERT_NO_THROW(client_->stop());
701 ASSERT_TRUE(client_->isStopped());
702
703 // We should have finished all our requests.
704 ASSERT_EQ(getRRCount(), total_requests);
705
706 // Stopping again should be harmless.
707 ASSERT_NO_THROW(client_->stop());
708
709 // Listeners should stop without issue.
710 for (const auto& listener : listeners_) {
711 ASSERT_NO_THROW(listener->stop());
712 }
713
714 // Get the stringified thread-id of the test's main thread.
715 std::stringstream ss;
716 ss << std::this_thread::get_id();
717 std::string main_thread_id = ss.str();
718
719 // Tracks the number for requests fulfilled by main thread.
720 size_t worked_by_main = 0;
721
722 // Iterate over the client request/response pairs.
723 for (auto const& clientRR : clientRRs_) {
724 // Make sure it's whole.
725 ASSERT_FALSE(clientRR->thread_id_.empty());
726 ASSERT_TRUE(clientRR->request_);
727 ASSERT_TRUE(clientRR->response_);
728
729 // Request should contain an integer sequence number.
730 int request_sequence;
731 ConstElementPtr sequence = clientRR->request_->getJsonElement("sequence");
732 ASSERT_TRUE(sequence);
733 ASSERT_NO_THROW(request_sequence = sequence->intValue());
734
735 // Response should contain an integer sequence number.
736 int response_sequence;
737 sequence = clientRR->response_->getJsonElement("sequence");
738 ASSERT_TRUE(sequence);
739 ASSERT_NO_THROW(response_sequence = sequence->intValue());
740
741 // Request and Response sequence numbers should match.
742 ASSERT_EQ(request_sequence, response_sequence);
743
744 ConstElementPtr server_port_elem = clientRR->response_->getJsonElement("server-port");
745 ASSERT_TRUE(server_port_elem);
746
747 // Track how many requests were completed by the main thread.
748 // These can occur when pausing calls IOService::poll.
749 if (clientRR->thread_id_ == main_thread_id) {
750 ++worked_by_main;
751 }
752 }
753
754 // Make sure the majority of the requests were worked by
755 // worker threads. In theory, the number of calls to poll
756 // times the number of threads is the limit for responses
757 // built by the main thread.
758 ASSERT_LE(worked_by_main, num_pauses * num_threads);
759 }
760
761 /// @brief Fetch the number of completed requests.
762 ///
763 /// @return number of completed requests.
getRRCount()764 size_t getRRCount() {
765 std::lock_guard<std::mutex> lck(test_mutex_);
766 return (clientRRs_.size());
767 }
768
769 /// @brief IO service used in the tests.
770 IOService io_service_;
771
772 /// @brief Instance of the client used in the tests.
773 HttpClientPtr client_;
774
775 /// @brief Instance of the listener used in the tests.
776 HttpListenerPtr listener_;
777
778 /// @brief Pointer to the response creator factory.
779 HttpResponseCreatorFactoryPtr factory_;
780
781 /// @brief List of listeners.
782 std::vector<HttpListenerPtr> listeners_;
783
784 /// @brief List of response factories.
785 std::vector<HttpResponseCreatorFactoryPtr> factories_;
786
787 /// @brief Asynchronous timer service to detect timeouts.
788 IntervalTimer test_timer_;
789
790 /// @brief Number of threads HttpClient should use.
791 size_t num_threads_;
792
793 /// @brief Number of request batches to conduct.
794 size_t num_batches_;
795
796 /// @brief Number of listeners to start.
797 size_t num_listeners_;
798
799 /// @brief Number of expected requests to carry out.
800 size_t expected_requests_;
801
802 /// @brief Number of requests that are in progress.
803 size_t num_in_progress_;
804
805 /// @brief Number of requests that have been completed.
806 size_t num_finished_;
807
808 /// @brief a List of client request-response pairs.
809 std::vector<ClientRRPtr> clientRRs_;
810
811 /// @brief Mutex for locking.
812 std::mutex test_mutex_;
813
814 /// @brief Condition variable used to make client threads wait
815 /// until number of in-progress requests reaches the number
816 /// of client requests.
817 std::condition_variable test_cv_;
818
819 /// @brief Indicates if client threads are currently "paused".
820 bool paused_;
821
822 /// @brief Number of times client has been paused during the test.
823 size_t pause_cnt_;
824 };
825
826 // Verifies we can construct and destruct, in both single
827 // and multi-threaded modes.
TEST_F(MultiThreadingHttpClientTest,basics)828 TEST_F(MultiThreadingHttpClientTest, basics) {
829 MultiThreadingMgr::instance().setMode(false);
830 HttpClientPtr client;
831
832 // Value of 0 for thread_pool_size means single-threaded.
833 ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 0)));
834 ASSERT_TRUE(client);
835
836 ASSERT_FALSE(client->getThreadIOService());
837 ASSERT_EQ(client->getThreadPoolSize(), 0);
838 ASSERT_EQ(client->getThreadCount(), 0);
839
840 // Make sure destruction doesn't throw.
841 ASSERT_NO_THROW_LOG(client.reset());
842
843 // Non-zero thread-pool-size means multi-threaded mode, should throw.
844 ASSERT_THROW_MSG(client.reset(new HttpClient(io_service_, 1)), InvalidOperation,
845 "HttpClient thread_pool_size must be zero"
846 "when Kea core multi-threading is disabled");
847 ASSERT_FALSE(client);
848
849 // Enable Kea core multi-threading.
850 MultiThreadingMgr::instance().setMode(true);
851
852 // Multi-threaded construction should work now.
853 ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 3)));
854 ASSERT_TRUE(client);
855
856 // Verify that it has an internal IOService and that thread pool size
857 // and thread count match.
858 ASSERT_TRUE(client->getThreadIOService());
859 EXPECT_FALSE(client->getThreadIOService()->stopped());
860 ASSERT_EQ(client->getThreadPoolSize(), 3);
861 ASSERT_EQ(client->getThreadCount(), 3);
862
863 // Check convenience functions.
864 ASSERT_TRUE(client->isRunning());
865 ASSERT_FALSE(client->isPaused());
866 ASSERT_FALSE(client->isStopped());
867
868 // Verify stop doesn't throw.
869 ASSERT_NO_THROW_LOG(client->stop());
870
871 // Verify we're stopped.
872 ASSERT_TRUE(client->getThreadIOService());
873 EXPECT_TRUE(client->getThreadIOService()->stopped());
874 ASSERT_EQ(client->getThreadPoolSize(), 3);
875 ASSERT_EQ(client->getThreadCount(), 0);
876
877 // Check convenience functions.
878 ASSERT_FALSE(client->isRunning());
879 ASSERT_FALSE(client->isPaused());
880 ASSERT_TRUE(client->isStopped());
881
882 // Verify a second call to stop() doesn't throw.
883 ASSERT_NO_THROW_LOG(client->stop());
884
885 // Make sure destruction doesn't throw.
886 ASSERT_NO_THROW_LOG(client.reset());
887
888 // Create another multi-threaded instance.
889 ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 3)));
890
891 // Make sure destruction doesn't throw.
892 ASSERT_NO_THROW_LOG(client.reset());
893 }
894
895 // Verifies we can construct with deferred start.
TEST_F(MultiThreadingHttpClientTest,deferredStart)896 TEST_F(MultiThreadingHttpClientTest, deferredStart) {
897 MultiThreadingMgr::instance().setMode(true);
898 HttpClientPtr client;
899 size_t thread_pool_size = 3;
900
901 // Create MT client with deferred start.
902 ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, thread_pool_size, true)));
903 ASSERT_TRUE(client);
904
905 // Client should be STOPPED, with no threads.
906 ASSERT_TRUE(client->getThreadIOService());
907 ASSERT_EQ(client->getThreadPoolSize(), thread_pool_size);
908 ASSERT_EQ(client->getThreadCount(), 0);
909
910 // Check convenience functions.
911 ASSERT_FALSE(client->isRunning());
912 ASSERT_FALSE(client->isPaused());
913 ASSERT_TRUE(client->isStopped());
914
915 // We should be able to start it.
916 ASSERT_NO_THROW(client->start());
917
918 // Verify we have threads and run state is RUNNING.
919 ASSERT_EQ(client->getThreadCount(), 3);
920 ASSERT_TRUE(client->getThreadIOService());
921 ASSERT_FALSE(client->getThreadIOService()->stopped());
922
923 // Check convenience functions.
924 ASSERT_TRUE(client->isRunning());
925 ASSERT_FALSE(client->isPaused());
926 ASSERT_FALSE(client->isStopped());
927
928 // Second call to start should be harmless.
929 ASSERT_NO_THROW_LOG(client->start());
930
931 // Verify we didn't break it.
932 ASSERT_EQ(client->getThreadCount(), 3);
933 ASSERT_TRUE(client->isRunning());
934
935 // Make sure destruction doesn't throw.
936 ASSERT_NO_THROW_LOG(client.reset());
937 }
938
939 // Verifies we can restart after stop.
TEST_F(MultiThreadingHttpClientTest,restartAfterStop)940 TEST_F(MultiThreadingHttpClientTest, restartAfterStop) {
941 MultiThreadingMgr::instance().setMode(true);
942 HttpClientPtr client;
943 size_t thread_pool_size = 3;
944
945 // Create MT client with instant start.
946 ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, thread_pool_size)));
947 ASSERT_TRUE(client);
948
949 // Verify we're started.
950 ASSERT_EQ(client->getThreadCount(), 3);
951 ASSERT_TRUE(client->getThreadIOService());
952 ASSERT_FALSE(client->getThreadIOService()->stopped());
953 ASSERT_TRUE(client->isRunning());
954
955 // Stop should succeed.
956 ASSERT_NO_THROW_LOG(client->stop());
957
958 // Verify we're stopped.
959 ASSERT_EQ(client->getThreadCount(), 0);
960 ASSERT_TRUE(client->getThreadIOService());
961 ASSERT_TRUE(client->getThreadIOService()->stopped());
962 ASSERT_TRUE(client->isStopped());
963
964 // Starting again should succeed.
965 ASSERT_NO_THROW_LOG(client->start());
966
967 // Verify we didn't break it.
968 ASSERT_EQ(client->getThreadCount(), 3);
969 ASSERT_TRUE(client->getThreadIOService());
970 ASSERT_FALSE(client->getThreadIOService()->stopped());
971 ASSERT_TRUE(client->isRunning());
972
973 // Make sure destruction doesn't throw.
974 ASSERT_NO_THROW_LOG(client.reset());
975 }
976
977 // Now we'll run some permutations of the number of client threads,
978 // requests, and listeners.
979
980 // Single-threaded, three batches, one listener.
TEST_F(MultiThreadingHttpClientTest,zeroByThreeByOne)981 TEST_F(MultiThreadingHttpClientTest, zeroByThreeByOne) {
982 size_t num_threads = 0; // Zero threads = ST mode.
983 size_t num_batches = 3;
984 threadRequestAndReceive(num_threads, num_batches);
985 }
986
987 // Single-threaded, three batches, three listeners.
TEST_F(MultiThreadingHttpClientTest,zeroByThreeByThree)988 TEST_F(MultiThreadingHttpClientTest, zeroByThreeByThree) {
989 size_t num_threads = 0; // Zero threads = ST mode.
990 size_t num_batches = 3;
991 size_t num_listeners = 3;
992 threadRequestAndReceive(num_threads, num_batches, num_listeners);
993 }
994
995 // Multi-threaded with one thread, three batches, one listener
TEST_F(MultiThreadingHttpClientTest,oneByThreeByOne)996 TEST_F(MultiThreadingHttpClientTest, oneByThreeByOne) {
997 size_t num_threads = 1;
998 size_t num_batches = 3;
999 threadRequestAndReceive(num_threads, num_batches);
1000 }
1001
1002 // Multi-threaded with three threads, three batches, one listener
TEST_F(MultiThreadingHttpClientTest,threeByThreeByOne)1003 TEST_F(MultiThreadingHttpClientTest, threeByThreeByOne) {
1004 size_t num_threads = 3;
1005 size_t num_batches = 3;
1006 threadRequestAndReceive(num_threads, num_batches);
1007 }
1008
1009 // Multi-threaded with three threads, nine batches, one listener
TEST_F(MultiThreadingHttpClientTest,threeByNineByOne)1010 TEST_F(MultiThreadingHttpClientTest, threeByNineByOne) {
1011 size_t num_threads = 3;
1012 size_t num_batches = 9;
1013 threadRequestAndReceive(num_threads, num_batches);
1014 }
1015
1016 // Multi-threaded with two threads, four batches, two listeners
TEST_F(MultiThreadingHttpClientTest,twoByFourByTwo)1017 TEST_F(MultiThreadingHttpClientTest, twoByFourByTwo) {
1018 size_t num_threads = 2;
1019 size_t num_batches = 4;
1020 size_t num_listeners = 2;
1021 threadRequestAndReceive(num_threads, num_batches, num_listeners);
1022 }
1023
1024 // Multi-threaded with four threads, four batches, two listeners
TEST_F(MultiThreadingHttpClientTest,fourByFourByTwo)1025 TEST_F(MultiThreadingHttpClientTest, fourByFourByTwo) {
1026 size_t num_threads = 4;
1027 size_t num_batches = 4;
1028 size_t num_listeners = 2;
1029 threadRequestAndReceive(num_threads, num_batches, num_listeners);
1030 }
1031
1032 // Verifies that we can cleanly pause, resume, and shutdown while doing
1033 // multi-threaded work.
TEST_F(MultiThreadingHttpClientTest,workPauseResumeShutdown)1034 TEST_F(MultiThreadingHttpClientTest, workPauseResumeShutdown) {
1035 size_t num_threads = 4;
1036 size_t num_batches = 4;
1037 size_t num_listeners = 4;
1038 size_t num_pauses = 3;
1039 workPauseResumeShutdown(num_threads, num_batches, num_listeners, num_pauses);
1040 }
1041
1042 } // end of anonymous namespace
1043