1 /* $Id: fullscan_runner.cpp 619524 2020-11-05 19:41:53Z saprykin $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Dmitrii Saprykin, NCBI
27 *
28 * File Description:
29 * Unit test suite to check fullscan runner operations
30 *
31 * ===========================================================================
32 */
33
34 #include <ncbi_pch.hpp>
35
36 #include <corelib/ncbireg.hpp>
37 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
38 #include <objtools/pubseq_gateway/impl/cassandra/cass_factory.hpp>
39 #include <objtools/pubseq_gateway/impl/cassandra/fullscan/runner.hpp>
40
41 #include <gtest/gtest.h>
42
43 #include <memory>
44 #include <string>
45 #include <vector>
46 #include <thread>
47 #include <utility>
48 #include <set>
49 #include <map>
50
51 #include "fullscan_plan_mock.hpp"
52
53 namespace {
54
55 USING_NCBI_SCOPE;
56 USING_IDBLOB_SCOPE;
57
58 using ::testing::Return;
59
60 class CCassandraFullscanRunnerTest
61 : public testing::Test
62 {
63 public:
CCassandraFullscanRunnerTest()64 CCassandraFullscanRunnerTest()
65 : m_KeyspaceName("test_ipg_storage_entrez")
66 , m_TableName("ipg_report")
67 {}
68
69 protected:
SetUpTestCase()70 static void SetUpTestCase() {
71 const string config_section = "TEST";
72 CNcbiRegistry r;
73 r.Set(config_section, "service", string(m_TestClusterName), IRegistry::fPersistent);
74 //r.Set(config_section, "numthreadsio", "1", IRegistry::fPersistent);
75 m_Factory = CCassConnectionFactory::s_Create();
76 m_Factory->LoadConfig(r, config_section);
77 m_Connection = m_Factory->CreateInstance();
78 m_Connection->Connect();
79 }
80
TearDownTestCase()81 static void TearDownTestCase() {
82 m_Connection->Close();
83 m_Connection = nullptr;
84 m_Factory = nullptr;
85 }
86
87 static const char* m_TestClusterName;
88 static shared_ptr<CCassConnectionFactory> m_Factory;
89 static shared_ptr<CCassConnection> m_Connection;
90
91 string m_KeyspaceName;
92 string m_TableName;
93 };
94
95 const char* CCassandraFullscanRunnerTest::m_TestClusterName = "ID_CASS_TEST";
96 shared_ptr<CCassConnectionFactory> CCassandraFullscanRunnerTest::m_Factory(nullptr);
97 shared_ptr<CCassConnection> CCassandraFullscanRunnerTest::m_Connection(nullptr);
98
99 struct SConsumeContext {
100 CFastMutex mutex;
101 bool tick_result = true;
102 bool read_result = true;
103 size_t tick_called = 0;
104 size_t finalize_called = 0;
105 size_t read_called = 0;
106 bool finalize_should_throw = false;
107 bool read_should_throw = false;
108 set<thread::id> thread_ids;
109 map<int64_t, size_t> ids_returned;
110
111 size_t max_consequtive_rows = 0;
112 size_t consequtive_rows = 0;
113 };
114
115 class CSimpleRowConsumer
116 : public ICassandraFullscanConsumer
117 {
118 public:
CSimpleRowConsumer(SConsumeContext * context)119 explicit CSimpleRowConsumer(SConsumeContext * context)
120 : m_Context(context)
121 {
122 }
123
Tick()124 bool Tick() override
125 {
126 if (m_Context) {
127 CFastMutexGuard _(m_Context->mutex);
128 m_Context->thread_ids.insert(this_thread::get_id());
129 ++m_Context->tick_called;
130 if (m_Context->consequtive_rows > m_Context->max_consequtive_rows) {
131 m_Context->max_consequtive_rows = m_Context->consequtive_rows;
132 }
133 m_Context->consequtive_rows = 0;
134 }
135 EXPECT_EQ(true, ICassandraFullscanConsumer::Tick())
136 << "Base Tick funcion should return true always";
137 return m_Context ? m_Context->tick_result : true;
138 }
139
ReadRow(CCassQuery const & query)140 bool ReadRow(CCassQuery const & query) override
141 {
142 if (m_Context) {
143 CFastMutexGuard _(m_Context->mutex);
144 m_Context->thread_ids.insert(this_thread::get_id());
145 ++m_Context->ids_returned[query.FieldGetInt64Value(0)];
146 ++m_Context->consequtive_rows;
147 ++m_Context->read_called;
148 if (m_Context->read_should_throw) {
149 NCBI_THROW(CCassandraException, eFatal, "Finalize may throw");
150 }
151 }
152 return m_Context ? m_Context->read_result : true;
153 }
154
Finalize()155 void Finalize() override
156 {
157 if (m_Context) {
158 CFastMutexGuard _(m_Context->mutex);
159 ++m_Context->finalize_called;
160
161 if (m_Context->finalize_should_throw) {
162 NCBI_THROW(CCassandraException, eFatal, "Finalize may throw");
163 }
164 }
165 }
166
167 SConsumeContext* m_Context;
168 };
169
make_default_plan_mock()170 unique_ptr<MockCassandraFullscanPlan> make_default_plan_mock()
171 {
172 auto plan_mock = make_unique<MockCassandraFullscanPlan>();
173 EXPECT_CALL(*plan_mock, Generate())
174 .Times(1);
175 EXPECT_CALL(*plan_mock, GetQueryCount())
176 .WillOnce(Return(1UL));
177 return plan_mock;
178 }
179
TEST_F(CCassandraFullscanRunnerTest,NonConfiguredRunnerTest)180 TEST_F(CCassandraFullscanRunnerTest, NonConfiguredRunnerTest) {
181 CCassandraFullscanRunner runner;
182 EXPECT_THROW(runner.Execute(), CCassandraException)
183 << "Execute should throw without configuration";
184 }
185
TEST_F(CCassandraFullscanRunnerTest,BrokenQueryRunnerTest)186 TEST_F(CCassandraFullscanRunnerTest, BrokenQueryRunnerTest) {
187 auto query = m_Connection->NewQuery();
188 auto plan_mock = make_default_plan_mock();
189 EXPECT_CALL(*plan_mock, GetNextQuery())
190 .WillOnce(Return(query));
191
192 CCassandraFullscanRunner runner;
193 runner
194 .SetThreadCount(4)
195 .SetExecutionPlan(move(plan_mock))
196 .SetConsumerFactory(
197 [] { return make_unique<CSimpleRowConsumer>(nullptr);}
198 );
199
200 EXPECT_THROW(runner.SetExecutionPlan(nullptr), CCassandraException)
201 << "SetExecutionPlan should throw when called second time";
202
203 EXPECT_THROW(runner.Execute(), CCassandraException)
204 << "Execute should throw with broken query";
205 }
206
TEST_F(CCassandraFullscanRunnerTest,AtLeastOneActiveStatementPerThread)207 TEST_F(CCassandraFullscanRunnerTest, AtLeastOneActiveStatementPerThread) {
208 auto plan_mock = make_unique<MockCassandraFullscanPlan>();
209 EXPECT_CALL(*plan_mock, Generate())
210 .Times(1);
211 EXPECT_CALL(*plan_mock, GetQueryCount())
212 .WillOnce(Return(4UL));
213 EXPECT_CALL(*plan_mock, GetNextQuery())
214 .Times(0);
215
216 SConsumeContext context;
217 CCassandraFullscanRunner runner;
218 runner
219 .SetThreadCount(4)
220 .SetExecutionPlan(move(plan_mock))
221 .SetMaxActiveStatements(1)
222 .SetConsumerFactory(
223 [&context] {
224 return make_unique<CSimpleRowConsumer>(&context);
225 }
226 );
227
228 EXPECT_THROW(runner.Execute(), CCassandraException)
229 << "Execute should throw on wrong max_active_statements";
230 }
231
TEST_F(CCassandraFullscanRunnerTest,OneThreadRunnerTest)232 TEST_F(CCassandraFullscanRunnerTest, OneThreadRunnerTest) {
233 auto query = m_Connection->NewQuery();
234 query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
235
236 auto plan_mock = make_default_plan_mock();
237 EXPECT_CALL(*plan_mock, GetNextQuery())
238 .WillOnce(Return(query))
239 .WillRepeatedly(Return(nullptr));
240
241 SConsumeContext context;
242 CCassandraFullscanRunner runner;
243 runner
244 .SetThreadCount(4)
245 .SetExecutionPlan(move(plan_mock))
246 .SetConsumerFactory(
247 [&context] {
248 return make_unique<CSimpleRowConsumer>(&context);
249 }
250 );
251
252 EXPECT_TRUE(runner.Execute());
253 EXPECT_EQ(1UL, context.thread_ids.size());
254 EXPECT_EQ(1UL, context.thread_ids.count(this_thread::get_id()));
255 EXPECT_EQ(1UL, context.ids_returned.size());
256 EXPECT_EQ(1UL, context.ids_returned[160755002]);
257 EXPECT_EQ(1UL, context.finalize_called);
258 }
259
TEST_F(CCassandraFullscanRunnerTest,MultiThreadRunnerTest)260 TEST_F(CCassandraFullscanRunnerTest, MultiThreadRunnerTest) {
261 auto query = m_Connection->NewQuery();
262 query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
263 auto query1 = m_Connection->NewQuery();
264 query1->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 103317145", 0);
265 auto query2 = m_Connection->NewQuery();
266 query2->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 15724717", 0);
267
268 auto plan_mock = make_unique<MockCassandraFullscanPlan>();
269 EXPECT_CALL(*plan_mock, Generate())
270 .Times(1);
271 EXPECT_CALL(*plan_mock, GetQueryCount())
272 .WillOnce(Return(3UL));
273 EXPECT_CALL(*plan_mock, GetNextQuery())
274 .Times(6)
275 .WillOnce(Return(query))
276 .WillOnce(Return(query1))
277 .WillOnce(Return(query2))
278 .WillRepeatedly(Return(nullptr));
279
280 SConsumeContext context;
281 CCassandraFullscanRunner runner;
282 runner
283 .SetThreadCount(4)
284 .SetExecutionPlan(move(plan_mock))
285 .SetConsumerFactory(
286 [&context] {
287 return make_unique<CSimpleRowConsumer>(&context);
288 }
289 );
290
291 EXPECT_TRUE(runner.Execute());
292 EXPECT_EQ(3UL, context.thread_ids.size());
293 EXPECT_EQ(1UL, context.thread_ids.count(this_thread::get_id()))
294 << "One worker should be executed in current thread";
295 EXPECT_EQ(3UL, context.ids_returned.size());
296 EXPECT_EQ(1UL, context.ids_returned[160755002]);
297 EXPECT_EQ(1UL, context.ids_returned[103317145]);
298 EXPECT_EQ(3UL, context.ids_returned[15724717]);
299 EXPECT_EQ(3UL, context.finalize_called);
300 }
301
TEST_F(CCassandraFullscanRunnerTest,FinalizeMayThrowTest)302 TEST_F(CCassandraFullscanRunnerTest, FinalizeMayThrowTest) {
303 auto query = m_Connection->NewQuery();
304 query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
305 auto query1 = m_Connection->NewQuery();
306 query1->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
307
308 unique_ptr<MockCassandraFullscanPlan> plan_mock = make_default_plan_mock();
309 EXPECT_CALL(*plan_mock, GetNextQuery())
310 .WillOnce(Return(query))
311 .WillOnce(Return(query1))
312 .WillOnce(Return(nullptr));
313
314 SConsumeContext context;
315 context.finalize_should_throw = true;
316 CCassandraFullscanRunner runner;
317 runner
318 .SetThreadCount(4)
319 .SetExecutionPlan(move(plan_mock))
320 .SetConsumerFactory(
321 [&context] {
322 return make_unique<CSimpleRowConsumer>(&context);
323 }
324 );
325
326 EXPECT_THROW(runner.Execute(), CCassandraException)
327 << "Execute should throw with finalize throw";
328 }
329
TEST_F(CCassandraFullscanRunnerTest,ReadMayThrowTest)330 TEST_F(CCassandraFullscanRunnerTest, ReadMayThrowTest) {
331 auto query = m_Connection->NewQuery();
332 query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
333
334 auto plan_mock = make_default_plan_mock();
335 EXPECT_CALL(*plan_mock, GetNextQuery())
336 .WillOnce(Return(query))
337 .WillOnce(Return(nullptr));
338
339 SConsumeContext context;
340 context.read_should_throw = true;
341 CCassandraFullscanRunner runner;
342 runner
343 .SetThreadCount(4)
344 .SetExecutionPlan(move(plan_mock))
345 .SetConsumerFactory(
346 [&context] {
347 return make_unique<CSimpleRowConsumer>(&context);
348 }
349 );
350
351 EXPECT_THROW(runner.Execute(), CCassandraException)
352 << "Execute should throw after read throw";
353 }
354
TEST_F(CCassandraFullscanRunnerTest,ReadMayReturnFalseTest)355 TEST_F(CCassandraFullscanRunnerTest, ReadMayReturnFalseTest) {
356 auto query = m_Connection->NewQuery();
357 query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 15724717", 0);
358
359 auto plan_mock = make_default_plan_mock();
360 EXPECT_CALL(*plan_mock, GetNextQuery())
361 .WillOnce(Return(query))
362 .WillOnce(Return(nullptr));
363
364 SConsumeContext context;
365 context.read_result = false;
366 CCassandraFullscanRunner runner;
367 runner
368 .SetThreadCount(4)
369 .SetExecutionPlan(move(plan_mock))
370 .SetConsumerFactory(
371 [&context] {
372 return make_unique<CSimpleRowConsumer>(&context);
373 }
374 );
375
376 EXPECT_FALSE(runner.Execute())
377 << "Execute should return false after any read call returns false";
378 EXPECT_EQ(1UL, context.read_called);
379 }
380
TEST_F(CCassandraFullscanRunnerTest,TickMayReturnFalseTest)381 TEST_F(CCassandraFullscanRunnerTest, TickMayReturnFalseTest) {
382 auto query = m_Connection->NewQuery();
383 query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 15724717", 0);
384 auto query1 = m_Connection->NewQuery();
385 query1->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
386
387 auto plan_mock = make_default_plan_mock();
388 EXPECT_CALL(*plan_mock, GetNextQuery())
389 .WillOnce(Return(query))
390 .WillOnce(Return(query1))
391 .WillOnce(Return(nullptr));
392
393 SConsumeContext context;
394 context.tick_result = false;
395 CCassandraFullscanRunner runner;
396 runner
397 .SetThreadCount(1)
398 .SetExecutionPlan(move(plan_mock))
399 .SetConsumerFactory(
400 [&context] {
401 return make_unique<CSimpleRowConsumer>(&context);
402 }
403 );
404
405 EXPECT_FALSE(runner.Execute())
406 << "Execute should return false after any read call returns false";
407 EXPECT_EQ(1UL, context.tick_called);
408 }
409
TEST_F(CCassandraFullscanRunnerTest,ResultPagingTest)410 TEST_F(CCassandraFullscanRunnerTest, ResultPagingTest) {
411 auto query = m_Connection->NewQuery();
412 // this group has 3 records
413 query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 15724717", 0);
414
415 auto plan_mock = make_default_plan_mock();
416 EXPECT_CALL(*plan_mock, GetNextQuery())
417 .WillOnce(Return(query))
418 .WillOnce(Return(nullptr));
419
420 SConsumeContext context;
421 CCassandraFullscanRunner runner;
422 runner
423 .SetThreadCount(1)
424 .SetConsistency(CASS_CONSISTENCY_LOCAL_ONE)
425 .SetPageSize(2)
426 .SetExecutionPlan(move(plan_mock))
427 .SetConsumerFactory(
428 [&context] {
429 return make_unique<CSimpleRowConsumer>(&context);
430 }
431 );
432
433 EXPECT_TRUE(runner.Execute());
434 EXPECT_EQ(2UL, context.max_consequtive_rows);
435 }
436
TEST_F(CCassandraFullscanRunnerTest,SmokeTest)437 TEST_F(CCassandraFullscanRunnerTest, SmokeTest) {
438 auto plan = make_unique<CCassandraFullscanPlan>();
439 plan
440 ->SetConnection(m_Connection)
441 .SetFieldList({"ipg", "accession"})
442 .SetKeyspace(m_KeyspaceName)
443 .SetTable(m_TableName);
444
445 CCassandraFullscanRunner runner;
446 runner
447 .SetThreadCount(4)
448 .SetExecutionPlan(move(plan))
449 .SetConsumerFactory(
450 [] {
451 return make_unique<CSimpleRowConsumer>(nullptr);
452 }
453 );
454 EXPECT_TRUE(runner.Execute());
455 }
456
457 } // namespace
458