1 // Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
9 #include <asiolink/asio_wrapper.h>
10 #include <asiolink/interval_timer.h>
11 #include <cc/data.h>
12 #include <http/client.h>
13 #include <http/listener.h>
14 #include <http/post_request_json.h>
15 #include <http/response_creator.h>
16 #include <http/response_creator_factory.h>
17 #include <http/response_json.h>
18 #include <http/url.h>
19 #include <util/multi_threading_mgr.h>
20 #include <testutils/gtest_utils.h>
21 
22 #include <boost/asio/ip/tcp.hpp>
23 #include <boost/pointer_cast.hpp>
24 #include <gtest/gtest.h>
25 
26 #include <functional>
27 #include <sstream>
28 #include <string>
29 
30 using namespace isc;
31 using namespace isc::asiolink;
32 using namespace isc::data;
33 using namespace isc::http;
34 using namespace isc::util;
35 namespace ph = std::placeholders;
36 
37 namespace {
38 
39 /// @brief IP address to which HTTP service is bound.
40 const std::string SERVER_ADDRESS = "127.0.0.1";
41 
42 /// @brief Port number to which HTTP service is bound.
43 const unsigned short SERVER_PORT = 18123;
44 
45 /// @brief Test timeout (ms).
46 const long TEST_TIMEOUT = 10000;
47 
48 /// @brief Container request/response pair handled by a given thread.
49 struct ClientRR {
50     /// @brief Thread id of the client thread handling the request as a string.
51     std::string thread_id_;
52 
53     /// @brief HTTP request submitted by the client thread.
54     PostHttpRequestJsonPtr request_;
55 
56     /// @brief HTTP response received by the client thread.
57     HttpResponseJsonPtr response_;
58 };
59 
60 /// @brief Pointer to a ClientRR instance.
61 typedef boost::shared_ptr<ClientRR> ClientRRPtr;
62 
63 /// @brief Implementation of the @ref HttpResponseCreator.
64 ///
65 /// Creates a response to a request containing body content
66 /// as follows:
67 ///
68 /// ```
69 ///     { "sequence" : nnnn }
70 /// ```
71 ///
72 /// The response will include the sequence number of the request
73 /// as well as the server port passed into the creator's constructor:
74 ///
75 /// ```
76 ///     { "sequence": nnnn, "server-port": xxxx }
77 /// ```
78 class TestHttpResponseCreator : public HttpResponseCreator {
79 public:
80     /// @brief Constructor
81     ///
82     /// @param server_port integer value the server listens upon, it is
83     /// echoed back in responses as "server-port".
TestHttpResponseCreator(uint16_t server_port)84     TestHttpResponseCreator(uint16_t server_port)
85     : server_port_(server_port) { }
86 
87     /// @brief Create a new request.
88     ///
89     /// @return Pointer to the new instance of the @ref HttpRequest.
90     virtual HttpRequestPtr
createNewHttpRequest() const91     createNewHttpRequest() const {
92         return (HttpRequestPtr(new PostHttpRequestJson()));
93     }
94 
95 private:
96     /// @brief Creates HTTP response.
97     ///
98     /// @param request Pointer to the HTTP request.
99     /// @param status_code status code to include in the response.
100     ///
101     /// @return Pointer to the generated HTTP response.
102     virtual HttpResponsePtr
createStockHttpResponse(const HttpRequestPtr & request,const HttpStatusCode & status_code) const103     createStockHttpResponse(const HttpRequestPtr& request,
104                             const HttpStatusCode& status_code) const {
105         // The request hasn't been finalized so the request object
106         // doesn't contain any information about the HTTP version number
107         // used. But, the context should have this data (assuming the
108         // HTTP version is parsed OK).
109         HttpVersion http_version(request->context()->http_version_major_,
110                                  request->context()->http_version_minor_);
111         // This will generate the response holding JSON content.
112         HttpResponseJsonPtr response(new HttpResponseJson(http_version, status_code));
113         response->finalize();
114         return (response);
115     }
116 
117     /// @brief Creates HTTP response.
118     ///
119     /// Generates a response which echoes the requests sequence
120     /// number as well as the creator's server port value. Responses
121     /// should appear as follows:
122     ///
123     /// ```
124     ///     { "sequence" : nnnn }
125     /// ```
126     ///
127     /// @param request Pointer to the HTTP request.
128     /// @return Pointer to the generated HTTP OK response with no content.
129     virtual HttpResponsePtr
createDynamicHttpResponse(HttpRequestPtr request)130     createDynamicHttpResponse(HttpRequestPtr request) {
131         // Request must always be JSON.
132         PostHttpRequestJsonPtr request_json =
133             boost::dynamic_pointer_cast<PostHttpRequestJson>(request);
134         if (!request_json) {
135             return (createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
136         }
137 
138         // Extract the sequence from the request.
139         ConstElementPtr sequence = request_json->getJsonElement("sequence");
140         if (!sequence) {
141             return (createStockHttpResponse(request, HttpStatusCode::BAD_REQUEST));
142         }
143 
144         // Create the response.
145         HttpResponseJsonPtr response(new HttpResponseJson(request->getHttpVersion(),
146                                                           HttpStatusCode::OK));
147         // Construct the body.
148         ElementPtr body = Element::createMap();
149         body->set("server-port", Element::create(server_port_));
150         body->set("sequence", sequence);
151 
152         // Echo request body back in the response.
153         response->setBodyAsJson(body);
154 
155         response->finalize();
156         return (response);
157     }
158 
159     /// @brief Port upon which this creator's server is listening.
160     ///
161     /// The intent is to use the value to determine which server generated
162     /// a given response.
163     uint16_t server_port_;
164 };
165 
166 /// @brief Implementation of the test @ref HttpResponseCreatorFactory.
167 ///
168 /// This factory class creates @ref TestHttpResponseCreator instances.
169 class TestHttpResponseCreatorFactory : public HttpResponseCreatorFactory {
170 public:
171 
172     /// @brief Constructor
173     ///
174     /// @param server_port port upon with the server is listening. This
175     /// value will be included in responses such that each response
176     /// can be attributed to a specific server.
TestHttpResponseCreatorFactory(uint16_t server_port)177     TestHttpResponseCreatorFactory(uint16_t server_port)
178     : server_port_(server_port) {};
179 
180     /// @brief Creates @ref TestHttpResponseCreator instance.
create() const181     virtual HttpResponseCreatorPtr create() const {
182         HttpResponseCreatorPtr response_creator(new TestHttpResponseCreator(server_port_));
183         return (response_creator);
184     }
185 
186     /// @brief Port upon which this factory's server is listening.
187     ///
188     /// The intent is to use the value to determine which server generated
189     /// a given response.
190     uint16_t server_port_;
191 };
192 
193 /// @brief Test fixture class for testing threading modes of HTTP client.
194 class MultiThreadingHttpClientTest : public ::testing::Test {
195 public:
196 
197     /// @brief Constructor.
MultiThreadingHttpClientTest()198     MultiThreadingHttpClientTest()
199         : io_service_(), client_(), listener_(), factory_(), listeners_(), factories_(),
200           test_timer_(io_service_), num_threads_(0), num_batches_(0), num_listeners_(0),
201           expected_requests_(0), num_in_progress_(0), num_finished_(0), paused_(false),
202           pause_cnt_(0) {
203         test_timer_.setup(std::bind(&MultiThreadingHttpClientTest::timeoutHandler, this, true),
204                           TEST_TIMEOUT, IntervalTimer::ONE_SHOT);
205         MultiThreadingMgr::instance().setMode(true);
206     }
207 
208     /// @brief Destructor.
~MultiThreadingHttpClientTest()209     ~MultiThreadingHttpClientTest() {
210         // Stop the client.
211         if (client_) {
212             client_->stop();
213         }
214 
215         // Stop all listeners.
216         for (const auto& listener : listeners_) {
217             listener->stop();
218         }
219 
220         MultiThreadingMgr::instance().setMode(false);
221     }
222 
223     /// @brief Callback function to invoke upon test timeout.
224     ///
225     /// It stops the IO service and reports test timeout.
226     ///
227     /// @param fail_on_timeout Specifies if test failure should be reported.
timeoutHandler(const bool fail_on_timeout)228     void timeoutHandler(const bool fail_on_timeout) {
229         if (fail_on_timeout) {
230             ADD_FAILURE() << "Timeout occurred while running the test!";
231         }
232         io_service_.stop();
233     }
234 
235     /// @brief Runs the test's IOService until the desired number of requests
236     /// have been carried out or the test fails.
runIOService(size_t request_limit)237     void runIOService(size_t request_limit) {
238         while (getRRCount() < request_limit) {
239             // Always call reset() before we call run();
240             io_service_.restart();
241 
242             // Run until a client stops the service.
243             io_service_.run();
244         }
245     }
246 
247     /// @brief Creates an HTTP request with JSON body.
248     ///
249     /// It includes a JSON parameter with a specified value.
250     ///
251     /// @param parameter_name JSON parameter to be included.
252     /// @param value JSON parameter value.
253     /// @param version HTTP version to be used. Default is HTTP/1.1.
254     template<typename ValueType>
createRequest(const std::string & parameter_name,const ValueType & value,const HttpVersion & version=HttpVersion (1,1))255     PostHttpRequestJsonPtr createRequest(const std::string& parameter_name,
256                                          const ValueType& value,
257                                          const HttpVersion& version = HttpVersion(1, 1)) {
258         // Create POST request with JSON body.
259         PostHttpRequestJsonPtr request(new PostHttpRequestJson(HttpRequest::Method::HTTP_POST,
260                                                                "/boo", version));
261         // Body is a map with a specified parameter included.
262         ElementPtr body = Element::createMap();
263         body->set(parameter_name, Element::create(value));
264         request->setBodyAsJson(body);
265         try {
266             request->finalize();
267         } catch (const std::exception& ex) {
268             ADD_FAILURE() << "failed to create request: " << ex.what();
269         }
270 
271         return (request);
272     }
273 
274     /// @brief Test that worker threads are not permitted to change thread pool
275     /// state.
testIllegalThreadPoolActions()276     void testIllegalThreadPoolActions() {
277         ASSERT_THROW(client_->start(), MultiThreadingInvalidOperation);
278         ASSERT_THROW(client_->pause(), MultiThreadingInvalidOperation);
279         ASSERT_THROW(client_->resume(), MultiThreadingInvalidOperation);
280     }
281 
282     /// @brief Initiates a single HTTP request.
283     ///
284     /// Constructs an HTTP post whose body is a JSON map containing a
285     /// single integer element, "sequence".
286     ///
287     /// The request completion handler will block each requesting thread
288     /// until the number of in-progress threads reaches the number of
289     /// threads in the pool. At that point, the handler will unblock
290     /// until all threads have finished preparing their response and are
291     /// ready to return. The handler will then notify all pending threads
292     /// and invoke stop() on the test's main IO service thread.
293     ///
294     /// @param sequence value for the integer element, "sequence",
295     /// to send in the request.
startRequest(int sequence,int port_offset=0)296     void startRequest(int sequence, int port_offset = 0) {
297         // Create the URL on which the server can be reached.
298         std::stringstream ss;
299         ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset);
300         Url url(ss.str());
301 
302         // Initiate request to the server.
303         PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
304         HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
305         ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
306                                                   request_json, response_json,
307             [this, request_json, response_json](const boost::system::error_code& ec,
308                                                 const HttpResponsePtr&,
309                                                 const std::string&) {
310             // Bail on an error.
311             ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
312 
313             // Wait here until we have as many in progress as we have threads.
314             {
315                 std::unique_lock<std::mutex> lck(test_mutex_);
316                 ++num_in_progress_;
317                 if (num_threads_ == 0 || num_in_progress_ == num_threads_) {
318                     // Everybody has one, let's go.
319                     num_finished_ = 0;
320                     test_cv_.notify_all();
321                 } else {
322                     // I'm ready but others aren't wait here.
323                     bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
324                                                  [&]() { return (num_in_progress_ == num_threads_); });
325                     if (!ret) {
326                         ADD_FAILURE() << "clients failed to start work";
327                     }
328                 }
329             }
330 
331             // If running on multiple threads, threads should be prohibited from
332             // changing the thread pool state.
333             if (num_threads_) {
334                 testIllegalThreadPoolActions();
335             }
336 
337             // Get stringified thread-id.
338             std::stringstream ss;
339             ss << std::this_thread::get_id();
340 
341             // Create the ClientRR.
342             ClientRRPtr clientRR(new ClientRR());
343             clientRR->thread_id_ = ss.str();
344             clientRR->request_ = request_json;
345             clientRR->response_ = response_json;
346 
347             // Wait here until we have as many ready to finish as we have threads.
348             {
349                 std::unique_lock<std::mutex> lck(test_mutex_);
350                 ++num_finished_;
351                 clientRRs_.push_back(clientRR);
352                 if (num_threads_ == 0 || num_finished_ == num_threads_) {
353                     // We're all done, notify the others and finish.
354                     num_in_progress_ = 0;
355                     test_cv_.notify_all();
356                     // Stop the test's IOService.
357                     io_service_.stop();
358                 } else {
359                     // I'm done but others aren't wait here.
360                     bool ret = test_cv_.wait_for(lck, std::chrono::seconds(10),
361                                                  [&]() { return (num_finished_ == num_threads_); });
362                     if (!ret) {
363                         ADD_FAILURE() << "clients failed to finish work";
364                     }
365                 }
366             }
367         }));
368     }
369 
370     /// @brief Initiates a single HTTP request.
371     ///
372     /// Constructs an HTTP post whose body is a JSON map containing a
373     /// single integer element, "sequence".
374     ///
375     /// The request completion handler simply constructs the response,
376     /// and adds it the list of completed request/responses. If the
377     /// number of completed requests has reached the expected number
378     /// it stops the test IOService.
379     ///
380     /// @param sequence value for the integer element, "sequence",
381     /// to send in the request.
startRequestSimple(int sequence,int port_offset=0)382     void startRequestSimple(int sequence, int port_offset = 0) {
383         // Create the URL on which the server can be reached.
384         std::stringstream ss;
385         ss << "http://" << SERVER_ADDRESS << ":" << (SERVER_PORT + port_offset);
386         Url url(ss.str());
387 
388         // Initiate request to the server.
389         PostHttpRequestJsonPtr request_json = createRequest("sequence", sequence);
390         HttpResponseJsonPtr response_json = boost::make_shared<HttpResponseJson>();
391         ASSERT_NO_THROW(client_->asyncSendRequest(url, TlsContextPtr(),
392                                                   request_json, response_json,
393             [this, request_json, response_json](const boost::system::error_code& ec,
394                                                 const HttpResponsePtr&,
395                                                 const std::string&) {
396             // Bail on an error.
397             ASSERT_FALSE(ec) << "asyncSendRequest failed, ec: " << ec;
398 
399             // Get stringified thread-id.
400             std::stringstream ss;
401             ss << std::this_thread::get_id();
402 
403             // Create the ClientRR.
404             ClientRRPtr clientRR(new ClientRR());
405             clientRR->thread_id_ = ss.str();
406             clientRR->request_ = request_json;
407             clientRR->response_ = response_json;
408 
409             {
410                 std::unique_lock<std::mutex> lck(test_mutex_);
411                 clientRRs_.push_back(clientRR);
412                 ++num_finished_;
413                 if ((num_finished_ >= expected_requests_) && !io_service_.stopped()) {
414                     io_service_.stop();
415                 }
416             }
417 
418         }));
419     }
420 
421     /// @brief Carries out HTTP requests via HttpClient to HTTP listener(s).
422     ///
423     /// This function creates one HttpClient with the given number
424     /// of threads and then the given number of HttpListeners. It then
425     /// initiates the given number of request batches where each batch
426     /// contains one request per thread per listener.
427     ///
428     /// Then it iteratively runs the test's IOService until all
429     /// the requests have been responded to, an error occurs, or the
430     /// test times out.
431     ///
432     /// Each request carries a single integer element, "sequence", which
433     /// uniquely identifies the request. Each response is expected to
434     /// contain this value echoed back along with the listener's server
435     /// port number. Thus each response can be matched to it's request
436     /// and to the listener that handled the request.
437     ///
438     /// After all requests have been conducted, the function verifies
439     /// that:
440     ///
441     /// 1. The number of requests conducted is correct
442     /// 2. The sequence numbers in request-response pairs match
443     /// 3. Each client thread handled the same number of requests
444     /// 4. Each listener handled the same number of requests
445     ///
446     /// @param num_threads number of threads the HttpClient should use.
447     /// A value of 0 puts the HttpClient in single-threaded mode.
448     /// @param num_batches number of batches of requests that should be
449     /// conducted.
450     /// @param num_listeners number of HttpListeners to create. Defaults
451     /// to 1.
threadRequestAndReceive(size_t num_threads,size_t num_batches,size_t num_listeners=1)452     void threadRequestAndReceive(size_t num_threads, size_t num_batches,
453                                  size_t num_listeners = 1) {
454         ASSERT_TRUE(num_batches);
455         ASSERT_TRUE(num_listeners);
456         num_threads_ = num_threads;
457         num_batches_ = num_batches;
458         num_listeners_ = num_listeners;
459 
460         // Client in ST is, in effect, 1 thread.
461         size_t effective_threads = (num_threads_ == 0 ? 1 : num_threads_);
462 
463         // Calculate the expected number of requests.
464         expected_requests_ = (num_batches_ * num_listeners_ * effective_threads);
465 
466         for (auto i = 0; i < num_listeners_; ++i) {
467             // Make a factory
468             HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
469             factories_.push_back(factory);
470 
471             // Need to create a Listener on
472             HttpListenerPtr listener(new HttpListener(io_service_,
473                                                       IOAddress(SERVER_ADDRESS), (SERVER_PORT + i),
474                                                       TlsContextPtr(), factory,
475                                                       HttpListener::RequestTimeout(10000),
476                                                       HttpListener::IdleTimeout(10000)));
477             listeners_.push_back(listener);
478 
479             // Start the server.
480             ASSERT_NO_THROW(listener->start());
481         }
482 
483         // Create an MT client with num_threads
484         ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
485         ASSERT_TRUE(client_);
486 
487         if (num_threads_ == 0) {
488             // If we single-threaded client should not have it's own IOService.
489             ASSERT_FALSE(client_->getThreadIOService());
490         } else {
491             // If we multi-threaded client should have it's own IOService.
492             ASSERT_TRUE(client_->getThreadIOService());
493         }
494 
495         // Verify the pool size and number of threads are as expected.
496         ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
497         ASSERT_EQ(client_->getThreadCount(), num_threads);
498 
499         // Start the requisite number of requests:
500         //   batch * listeners * threads.
501         int sequence = 0;
502         for (auto b = 0; b < num_batches; ++b) {
503             for (auto l = 0; l < num_listeners_; ++l) {
504                 for (auto t = 0; t < effective_threads; ++t) {
505                     startRequest(++sequence, l);
506                 }
507             }
508         }
509 
510         // Loop until the clients are done, an error occurs, or the time runs out.
511         runIOService(expected_requests_);
512 
513         // Client should stop without issue.
514         ASSERT_NO_THROW(client_->stop());
515 
516         // Listeners should stop without issue.
517         for (const auto& listener : listeners_) {
518             ASSERT_NO_THROW(listener->stop());
519         }
520 
521         // We should have a response for each request.
522         ASSERT_EQ(getRRCount(), expected_requests_);
523 
524         // Create a map to track number of responses for each client thread.
525         std::map<std::string, int> responses_per_thread;
526 
527         // Create a map to track number of responses for each listener port.
528         std::map<uint16_t, int> responses_per_listener;
529 
530         // Get the stringified thread-id of the test's main thread.
531         std::stringstream ss;
532         ss << std::this_thread::get_id();
533         std::string main_thread_id = ss.str();
534 
535         // Iterate over the client request/response pairs.
536         for (auto const& clientRR : clientRRs_) {
537             // Make sure it's whole.
538             ASSERT_FALSE(clientRR->thread_id_.empty());
539             ASSERT_TRUE(clientRR->request_);
540             ASSERT_TRUE(clientRR->response_);
541 
542             // Request should contain an integer sequence number.
543             int request_sequence;
544             ConstElementPtr sequence = clientRR->request_->getJsonElement("sequence");
545             ASSERT_TRUE(sequence);
546             ASSERT_NO_THROW(request_sequence = sequence->intValue());
547 
548             // Response should contain an integer sequence number.
549             int response_sequence;
550             sequence = clientRR->response_->getJsonElement("sequence");
551             ASSERT_TRUE(sequence);
552             ASSERT_NO_THROW(response_sequence = sequence->intValue());
553 
554             // Request and Response sequence numbers should match.
555             ASSERT_EQ(request_sequence, response_sequence);
556 
557             ConstElementPtr server_port_elem = clientRR->response_->getJsonElement("server-port");
558             ASSERT_TRUE(server_port_elem);
559             uint16_t server_port = server_port_elem->intValue();
560 
561             if (num_threads_ == 0) {
562                 // For ST mode thread id should always be the main thread.
563                 ASSERT_EQ(clientRR->thread_id_, main_thread_id);
564             } else {
565                 // For MT mode the thread id should never be the main thread.
566                 ASSERT_NE(clientRR->thread_id_, main_thread_id);
567             }
568 
569             // Bump the response count for the given client thread-id.
570             auto rit = responses_per_thread.find(clientRR->thread_id_);
571             if (rit != responses_per_thread.end()) {
572                 responses_per_thread[clientRR->thread_id_] = rit->second + 1;
573             } else {
574                 responses_per_thread[clientRR->thread_id_] = 1;
575             }
576 
577             // Bump the response count for the given server port.
578             auto lit = responses_per_listener.find(server_port);
579             if (lit != responses_per_listener.end()) {
580                 responses_per_listener[server_port] = lit->second + 1;
581             } else {
582                 responses_per_listener[server_port] = 1;
583             }
584         }
585 
586         // Make sure that all client threads received responses.
587         ASSERT_EQ(responses_per_thread.size(), effective_threads);
588 
589         // Make sure that each client thread received the same number of responses.
590         for (auto const& it : responses_per_thread) {
591             EXPECT_EQ(it.second, (num_batches_ * num_listeners_))
592                       << "thread-id: " << it.first
593                       << ", responses: " << it.second << std::endl;
594         }
595 
596         // Make sure that all listeners generated responses.
597         ASSERT_EQ(responses_per_listener.size(), num_listeners_);
598 
599         // Make sure Each listener generated the same number of responses.
600         for (auto const& it : responses_per_listener) {
601             EXPECT_EQ(it.second, (num_batches_ * effective_threads))
602                       << "server-port: " << it.first
603                       << ", responses: " << it.second << std::endl;
604         }
605     }
606 
607     /// @brief Verifies the client can be paused and resumed repeatedly
608     /// while doing multi-threaded work.
609     ///
610     /// @param num_threads number of threads the HttpClient should use.
611     /// Must be greater than zero, this test does not make sense for a
612     /// single threaded client.
613     /// @param num_batches number of batches of requests that should be
614     /// conducted.
615     /// @param num_listeners number of HttpListeners to create.
616     /// @param num_pauses number of pauses to conduct.
workPauseResumeShutdown(size_t num_threads,size_t num_batches,size_t num_listeners,size_t num_pauses)617     void workPauseResumeShutdown(size_t num_threads, size_t num_batches,
618                                  size_t num_listeners, size_t num_pauses) {
619         ASSERT_TRUE(num_threads);
620         ASSERT_TRUE(num_batches);
621         ASSERT_TRUE(num_listeners);
622         num_threads_ = num_threads;
623         num_batches_ = num_batches;
624         num_listeners_ = num_listeners;
625 
626         // Calculate the total expected number of requests.
627         size_t total_requests = (num_batches_ * num_listeners_ * num_threads_);
628 
629         // Create the listeners.
630         for (auto i = 0; i < num_listeners_; ++i) {
631             // Make a factory
632             HttpResponseCreatorFactoryPtr factory(new TestHttpResponseCreatorFactory(SERVER_PORT + i));
633             factories_.push_back(factory);
634 
635             // Need to create a Listener on
636             HttpListenerPtr listener(new HttpListener(io_service_,
637                                                       IOAddress(SERVER_ADDRESS), (SERVER_PORT + i),
638                                                       TlsContextPtr(), factory,
639                                                       HttpListener::RequestTimeout(10000),
640                                                       HttpListener::IdleTimeout(10000)));
641             listeners_.push_back(listener);
642 
643             // Start the server.
644             ASSERT_NO_THROW(listener->start());
645         }
646 
647         // Create an instant start, MT client with num_threads
648         ASSERT_NO_THROW_LOG(client_.reset(new HttpClient(io_service_, num_threads)));
649         ASSERT_TRUE(client_);
650 
651         // Client should be running. Check convenience functions.
652         ASSERT_TRUE(client_->isRunning());
653         ASSERT_FALSE(client_->isPaused());
654         ASSERT_FALSE(client_->isStopped());
655 
656         // Verify the pool size and number of threads are as expected.
657         ASSERT_EQ(client_->getThreadPoolSize(), num_threads);
658         ASSERT_EQ(client_->getThreadCount(), num_threads);
659 
660         // Start the requisite number of requests:
661         //   batch * listeners * threads.
662         int sequence = 0;
663         for (auto b = 0; b < num_batches; ++b) {
664             for (auto l = 0; l < num_listeners_; ++l) {
665                 for (auto t = 0; t < num_threads_; ++t) {
666                     startRequestSimple(++sequence, l);
667                 }
668             }
669         }
670 
671         size_t rr_count = 0;
672         while (rr_count < total_requests) {
673             size_t request_limit = (pause_cnt_ < num_pauses ?
674                                     (rr_count + ((total_requests - rr_count) / num_pauses))
675                                     : total_requests);
676 
677             // Run test IOService until we hit the limit.
678             runIOService(request_limit);
679 
680             // If we've done all our pauses we should be through.
681             if (pause_cnt_ == num_pauses) {
682                 break;
683             }
684 
685             // Pause the client.
686             ASSERT_NO_THROW(client_->pause());
687             ASSERT_TRUE(client_->isPaused());
688             ++pause_cnt_;
689 
690             // Check our progress.
691             rr_count = getRRCount();
692             ASSERT_GE(rr_count, request_limit);
693 
694             // Resume the client.
695             ASSERT_NO_THROW(client_->resume());
696             ASSERT_TRUE(client_->isRunning());
697         }
698 
699         // Client should stop without issue.
700         ASSERT_NO_THROW(client_->stop());
701         ASSERT_TRUE(client_->isStopped());
702 
703         // We should have finished all our requests.
704         ASSERT_EQ(getRRCount(), total_requests);
705 
706         // Stopping again should be harmless.
707         ASSERT_NO_THROW(client_->stop());
708 
709         // Listeners should stop without issue.
710         for (const auto& listener : listeners_) {
711             ASSERT_NO_THROW(listener->stop());
712         }
713 
714         // Get the stringified thread-id of the test's main thread.
715         std::stringstream ss;
716         ss << std::this_thread::get_id();
717         std::string main_thread_id = ss.str();
718 
719         // Tracks the number for requests fulfilled by main thread.
720         size_t worked_by_main = 0;
721 
722         // Iterate over the client request/response pairs.
723         for (auto const& clientRR : clientRRs_) {
724             // Make sure it's whole.
725             ASSERT_FALSE(clientRR->thread_id_.empty());
726             ASSERT_TRUE(clientRR->request_);
727             ASSERT_TRUE(clientRR->response_);
728 
729             // Request should contain an integer sequence number.
730             int request_sequence;
731             ConstElementPtr sequence = clientRR->request_->getJsonElement("sequence");
732             ASSERT_TRUE(sequence);
733             ASSERT_NO_THROW(request_sequence = sequence->intValue());
734 
735             // Response should contain an integer sequence number.
736             int response_sequence;
737             sequence = clientRR->response_->getJsonElement("sequence");
738             ASSERT_TRUE(sequence);
739             ASSERT_NO_THROW(response_sequence = sequence->intValue());
740 
741             // Request and Response sequence numbers should match.
742             ASSERT_EQ(request_sequence, response_sequence);
743 
744             ConstElementPtr server_port_elem = clientRR->response_->getJsonElement("server-port");
745             ASSERT_TRUE(server_port_elem);
746 
747             // Track how many requests were completed by the main thread.
748             // These can occur when pausing calls IOService::poll.
749             if (clientRR->thread_id_ == main_thread_id) {
750                 ++worked_by_main;
751             }
752         }
753 
754         // Make sure the majority of the requests were worked by
755         // worker threads.  In theory, the number of calls to poll
756         // times the number of threads is the limit for responses
757         // built by the main thread.
758         ASSERT_LE(worked_by_main, num_pauses * num_threads);
759     }
760 
761     /// @brief Fetch the number of completed requests.
762     ///
763     /// @return number of completed requests.
getRRCount()764     size_t getRRCount() {
765         std::lock_guard<std::mutex> lck(test_mutex_);
766         return (clientRRs_.size());
767     }
768 
769     /// @brief IO service used in the tests.
770     IOService io_service_;
771 
772     /// @brief Instance of the client used in the tests.
773     HttpClientPtr client_;
774 
775     /// @brief Instance of the listener used in the tests.
776     HttpListenerPtr listener_;
777 
778     /// @brief Pointer to the response creator factory.
779     HttpResponseCreatorFactoryPtr factory_;
780 
781     /// @brief List of listeners.
782     std::vector<HttpListenerPtr> listeners_;
783 
784     /// @brief List of response factories.
785     std::vector<HttpResponseCreatorFactoryPtr> factories_;
786 
787     /// @brief Asynchronous timer service to detect timeouts.
788     IntervalTimer test_timer_;
789 
790     /// @brief Number of threads HttpClient should use.
791     size_t num_threads_;
792 
793     /// @brief Number of request batches to conduct.
794     size_t num_batches_;
795 
796     /// @brief Number of listeners to start.
797     size_t num_listeners_;
798 
799     /// @brief Number of expected requests to carry out.
800     size_t expected_requests_;
801 
802     /// @brief Number of requests that are in progress.
803     size_t num_in_progress_;
804 
805     /// @brief Number of requests that have been completed.
806     size_t num_finished_;
807 
808     /// @brief a List of client request-response pairs.
809     std::vector<ClientRRPtr> clientRRs_;
810 
811     /// @brief Mutex for locking.
812     std::mutex test_mutex_;
813 
814     /// @brief Condition variable used to make client threads wait
815     /// until number of in-progress requests reaches the number
816     /// of client requests.
817     std::condition_variable test_cv_;
818 
819     /// @brief Indicates if client threads are currently "paused".
820     bool paused_;
821 
822     /// @brief Number of times client has been paused during the test.
823     size_t pause_cnt_;
824 };
825 
826 // Verifies we can construct and destruct, in both single
827 // and multi-threaded modes.
TEST_F(MultiThreadingHttpClientTest,basics)828 TEST_F(MultiThreadingHttpClientTest, basics) {
829     MultiThreadingMgr::instance().setMode(false);
830     HttpClientPtr client;
831 
832     // Value of 0 for thread_pool_size means single-threaded.
833     ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 0)));
834     ASSERT_TRUE(client);
835 
836     ASSERT_FALSE(client->getThreadIOService());
837     ASSERT_EQ(client->getThreadPoolSize(), 0);
838     ASSERT_EQ(client->getThreadCount(), 0);
839 
840     // Make sure destruction doesn't throw.
841     ASSERT_NO_THROW_LOG(client.reset());
842 
843     // Non-zero thread-pool-size means multi-threaded mode, should throw.
844     ASSERT_THROW_MSG(client.reset(new HttpClient(io_service_, 1)), InvalidOperation,
845                                   "HttpClient thread_pool_size must be zero"
846                                   "when Kea core multi-threading is disabled");
847     ASSERT_FALSE(client);
848 
849     // Enable Kea core multi-threading.
850     MultiThreadingMgr::instance().setMode(true);
851 
852     // Multi-threaded construction should work now.
853     ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 3)));
854     ASSERT_TRUE(client);
855 
856     // Verify that it has an internal IOService and that thread pool size
857     // and thread count match.
858     ASSERT_TRUE(client->getThreadIOService());
859     EXPECT_FALSE(client->getThreadIOService()->stopped());
860     ASSERT_EQ(client->getThreadPoolSize(), 3);
861     ASSERT_EQ(client->getThreadCount(), 3);
862 
863     // Check convenience functions.
864     ASSERT_TRUE(client->isRunning());
865     ASSERT_FALSE(client->isPaused());
866     ASSERT_FALSE(client->isStopped());
867 
868     // Verify stop doesn't throw.
869     ASSERT_NO_THROW_LOG(client->stop());
870 
871     // Verify we're stopped.
872     ASSERT_TRUE(client->getThreadIOService());
873     EXPECT_TRUE(client->getThreadIOService()->stopped());
874     ASSERT_EQ(client->getThreadPoolSize(), 3);
875     ASSERT_EQ(client->getThreadCount(), 0);
876 
877     // Check convenience functions.
878     ASSERT_FALSE(client->isRunning());
879     ASSERT_FALSE(client->isPaused());
880     ASSERT_TRUE(client->isStopped());
881 
882     // Verify a second call to stop() doesn't throw.
883     ASSERT_NO_THROW_LOG(client->stop());
884 
885     // Make sure destruction doesn't throw.
886     ASSERT_NO_THROW_LOG(client.reset());
887 
888     // Create another multi-threaded instance.
889     ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 3)));
890 
891     // Make sure destruction doesn't throw.
892     ASSERT_NO_THROW_LOG(client.reset());
893 }
894 
895 // Verifies we can construct with deferred start.
TEST_F(MultiThreadingHttpClientTest,deferredStart)896 TEST_F(MultiThreadingHttpClientTest, deferredStart) {
897     MultiThreadingMgr::instance().setMode(true);
898     HttpClientPtr client;
899     size_t thread_pool_size = 3;
900 
901     // Create MT client with deferred start.
902     ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, thread_pool_size, true)));
903     ASSERT_TRUE(client);
904 
905     // Client should be STOPPED, with no threads.
906     ASSERT_TRUE(client->getThreadIOService());
907     ASSERT_EQ(client->getThreadPoolSize(), thread_pool_size);
908     ASSERT_EQ(client->getThreadCount(), 0);
909 
910     // Check convenience functions.
911     ASSERT_FALSE(client->isRunning());
912     ASSERT_FALSE(client->isPaused());
913     ASSERT_TRUE(client->isStopped());
914 
915     // We should be able to start it.
916     ASSERT_NO_THROW(client->start());
917 
918     // Verify we have threads and run state is RUNNING.
919     ASSERT_EQ(client->getThreadCount(), 3);
920     ASSERT_TRUE(client->getThreadIOService());
921     ASSERT_FALSE(client->getThreadIOService()->stopped());
922 
923     // Check convenience functions.
924     ASSERT_TRUE(client->isRunning());
925     ASSERT_FALSE(client->isPaused());
926     ASSERT_FALSE(client->isStopped());
927 
928     // Second call to start should be harmless.
929     ASSERT_NO_THROW_LOG(client->start());
930 
931     // Verify we didn't break it.
932     ASSERT_EQ(client->getThreadCount(), 3);
933     ASSERT_TRUE(client->isRunning());
934 
935     // Make sure destruction doesn't throw.
936     ASSERT_NO_THROW_LOG(client.reset());
937 }
938 
939 // Verifies we can restart after stop.
TEST_F(MultiThreadingHttpClientTest,restartAfterStop)940 TEST_F(MultiThreadingHttpClientTest, restartAfterStop) {
941     MultiThreadingMgr::instance().setMode(true);
942     HttpClientPtr client;
943     size_t thread_pool_size = 3;
944 
945     // Create MT client with instant start.
946     ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, thread_pool_size)));
947     ASSERT_TRUE(client);
948 
949     // Verify we're started.
950     ASSERT_EQ(client->getThreadCount(), 3);
951     ASSERT_TRUE(client->getThreadIOService());
952     ASSERT_FALSE(client->getThreadIOService()->stopped());
953     ASSERT_TRUE(client->isRunning());
954 
955     // Stop should succeed.
956     ASSERT_NO_THROW_LOG(client->stop());
957 
958     // Verify we're stopped.
959     ASSERT_EQ(client->getThreadCount(), 0);
960     ASSERT_TRUE(client->getThreadIOService());
961     ASSERT_TRUE(client->getThreadIOService()->stopped());
962     ASSERT_TRUE(client->isStopped());
963 
964     // Starting again should succeed.
965     ASSERT_NO_THROW_LOG(client->start());
966 
967     // Verify we didn't break it.
968     ASSERT_EQ(client->getThreadCount(), 3);
969     ASSERT_TRUE(client->getThreadIOService());
970     ASSERT_FALSE(client->getThreadIOService()->stopped());
971     ASSERT_TRUE(client->isRunning());
972 
973     // Make sure destruction doesn't throw.
974     ASSERT_NO_THROW_LOG(client.reset());
975 }
976 
977 // Now we'll run some permutations of the number of client threads,
978 // requests, and listeners.
979 
980 // Single-threaded, three batches, one listener.
TEST_F(MultiThreadingHttpClientTest,zeroByThreeByOne)981 TEST_F(MultiThreadingHttpClientTest, zeroByThreeByOne) {
982     size_t num_threads = 0; // Zero threads = ST mode.
983     size_t num_batches = 3;
984     threadRequestAndReceive(num_threads, num_batches);
985 }
986 
987 // Single-threaded, three batches, three listeners.
TEST_F(MultiThreadingHttpClientTest,zeroByThreeByThree)988 TEST_F(MultiThreadingHttpClientTest, zeroByThreeByThree) {
989     size_t num_threads = 0; // Zero threads = ST mode.
990     size_t num_batches = 3;
991     size_t num_listeners = 3;
992     threadRequestAndReceive(num_threads, num_batches, num_listeners);
993 }
994 
995 // Multi-threaded with one thread, three batches, one listener
TEST_F(MultiThreadingHttpClientTest,oneByThreeByOne)996 TEST_F(MultiThreadingHttpClientTest, oneByThreeByOne) {
997     size_t num_threads = 1;
998     size_t num_batches = 3;
999     threadRequestAndReceive(num_threads, num_batches);
1000 }
1001 
1002 // Multi-threaded with three threads, three batches, one listener
TEST_F(MultiThreadingHttpClientTest,threeByThreeByOne)1003 TEST_F(MultiThreadingHttpClientTest, threeByThreeByOne) {
1004     size_t num_threads = 3;
1005     size_t num_batches = 3;
1006     threadRequestAndReceive(num_threads, num_batches);
1007 }
1008 
1009 // Multi-threaded with three threads, nine batches, one listener
TEST_F(MultiThreadingHttpClientTest,threeByNineByOne)1010 TEST_F(MultiThreadingHttpClientTest, threeByNineByOne) {
1011     size_t num_threads = 3;
1012     size_t num_batches = 9;
1013     threadRequestAndReceive(num_threads, num_batches);
1014 }
1015 
1016 // Multi-threaded with two threads, four batches, two listeners
TEST_F(MultiThreadingHttpClientTest,twoByFourByTwo)1017 TEST_F(MultiThreadingHttpClientTest, twoByFourByTwo) {
1018     size_t num_threads = 2;
1019     size_t num_batches = 4;
1020     size_t num_listeners = 2;
1021     threadRequestAndReceive(num_threads, num_batches, num_listeners);
1022 }
1023 
1024 // Multi-threaded with four threads, four batches, two listeners
TEST_F(MultiThreadingHttpClientTest,fourByFourByTwo)1025 TEST_F(MultiThreadingHttpClientTest, fourByFourByTwo) {
1026     size_t num_threads = 4;
1027     size_t num_batches = 4;
1028     size_t num_listeners = 2;
1029     threadRequestAndReceive(num_threads, num_batches, num_listeners);
1030 }
1031 
1032 // Verifies that we can cleanly pause, resume, and shutdown while doing
1033 // multi-threaded work.
TEST_F(MultiThreadingHttpClientTest,workPauseResumeShutdown)1034 TEST_F(MultiThreadingHttpClientTest, workPauseResumeShutdown) {
1035     size_t num_threads = 4;
1036     size_t num_batches = 4;
1037     size_t num_listeners = 4;
1038     size_t num_pauses = 3;
1039     workPauseResumeShutdown(num_threads, num_batches, num_listeners, num_pauses);
1040 }
1041 
1042 } // end of anonymous namespace
1043