1 /*  $Id: fullscan_runner.cpp 619524 2020-11-05 19:41:53Z saprykin $
2 * ===========================================================================
3 *
4 *                            PUBLIC DOMAIN NOTICE
5 *               National Center for Biotechnology Information
6 *
7 *  This software/database is a "United States Government Work" under the
8 *  terms of the United States Copyright Act.  It was written as part of
9 *  the author's official duties as a United States Government employee and
10 *  thus cannot be copyrighted.  This software/database is freely available
11 *  to the public for use. The National Library of Medicine and the U.S.
12 *  Government have not placed any restriction on its use or reproduction.
13 *
14 *  Although all reasonable efforts have been taken to ensure the accuracy
15 *  and reliability of the software and data, the NLM and the U.S.
16 *  Government do not and cannot warrant the performance or results that
17 *  may be obtained by using this software or data. The NLM and the U.S.
18 *  Government disclaim all warranties, express or implied, including
19 *  warranties of performance, merchantability or fitness for any particular
20 *  purpose.
21 *
22 *  Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author:  Dmitrii Saprykin, NCBI
27 *
28 * File Description:
29 *   Unit test suite to check fullscan runner operations
30 *
31 * ===========================================================================
32 */
33 
34 #include <ncbi_pch.hpp>
35 
36 #include <corelib/ncbireg.hpp>
37 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
38 #include <objtools/pubseq_gateway/impl/cassandra/cass_factory.hpp>
39 #include <objtools/pubseq_gateway/impl/cassandra/fullscan/runner.hpp>
40 
41 #include <gtest/gtest.h>
42 
43 #include <memory>
44 #include <string>
45 #include <vector>
46 #include <thread>
47 #include <utility>
48 #include <set>
49 #include <map>
50 
51 #include "fullscan_plan_mock.hpp"
52 
53 namespace {
54 
55 USING_NCBI_SCOPE;
56 USING_IDBLOB_SCOPE;
57 
58 using ::testing::Return;
59 
60 class CCassandraFullscanRunnerTest
61     : public testing::Test
62 {
63  public:
CCassandraFullscanRunnerTest()64     CCassandraFullscanRunnerTest()
65      : m_KeyspaceName("test_ipg_storage_entrez")
66      , m_TableName("ipg_report")
67     {}
68 
69  protected:
SetUpTestCase()70     static void SetUpTestCase() {
71         const string config_section = "TEST";
72         CNcbiRegistry r;
73         r.Set(config_section, "service", string(m_TestClusterName), IRegistry::fPersistent);
74         //r.Set(config_section, "numthreadsio", "1", IRegistry::fPersistent);
75         m_Factory = CCassConnectionFactory::s_Create();
76         m_Factory->LoadConfig(r, config_section);
77         m_Connection = m_Factory->CreateInstance();
78         m_Connection->Connect();
79     }
80 
TearDownTestCase()81     static void TearDownTestCase() {
82         m_Connection->Close();
83         m_Connection = nullptr;
84         m_Factory = nullptr;
85     }
86 
87     static const char* m_TestClusterName;
88     static shared_ptr<CCassConnectionFactory> m_Factory;
89     static shared_ptr<CCassConnection> m_Connection;
90 
91     string m_KeyspaceName;
92     string m_TableName;
93 };
94 
95 const char* CCassandraFullscanRunnerTest::m_TestClusterName = "ID_CASS_TEST";
96 shared_ptr<CCassConnectionFactory> CCassandraFullscanRunnerTest::m_Factory(nullptr);
97 shared_ptr<CCassConnection> CCassandraFullscanRunnerTest::m_Connection(nullptr);
98 
99 struct SConsumeContext {
100     CFastMutex mutex;
101     bool tick_result = true;
102     bool read_result = true;
103     size_t tick_called = 0;
104     size_t finalize_called = 0;
105     size_t read_called = 0;
106     bool finalize_should_throw = false;
107     bool read_should_throw = false;
108     set<thread::id> thread_ids;
109     map<int64_t, size_t> ids_returned;
110 
111     size_t max_consequtive_rows = 0;
112     size_t consequtive_rows = 0;
113 };
114 
115 class CSimpleRowConsumer
116     : public ICassandraFullscanConsumer
117 {
118  public:
CSimpleRowConsumer(SConsumeContext * context)119     explicit CSimpleRowConsumer(SConsumeContext * context)
120         : m_Context(context)
121     {
122     }
123 
Tick()124     bool Tick() override
125     {
126         if (m_Context) {
127             CFastMutexGuard _(m_Context->mutex);
128             m_Context->thread_ids.insert(this_thread::get_id());
129             ++m_Context->tick_called;
130             if (m_Context->consequtive_rows > m_Context->max_consequtive_rows) {
131                 m_Context->max_consequtive_rows = m_Context->consequtive_rows;
132             }
133             m_Context->consequtive_rows = 0;
134         }
135         EXPECT_EQ(true, ICassandraFullscanConsumer::Tick())
136                 << "Base Tick funcion should return true always";
137         return m_Context ? m_Context->tick_result : true;
138     }
139 
ReadRow(CCassQuery const & query)140     bool ReadRow(CCassQuery const & query) override
141     {
142         if (m_Context) {
143             CFastMutexGuard _(m_Context->mutex);
144             m_Context->thread_ids.insert(this_thread::get_id());
145             ++m_Context->ids_returned[query.FieldGetInt64Value(0)];
146             ++m_Context->consequtive_rows;
147             ++m_Context->read_called;
148             if (m_Context->read_should_throw) {
149                 NCBI_THROW(CCassandraException, eFatal, "Finalize may throw");
150             }
151         }
152         return m_Context ? m_Context->read_result : true;
153     }
154 
Finalize()155     void Finalize() override
156     {
157         if (m_Context) {
158             CFastMutexGuard _(m_Context->mutex);
159             ++m_Context->finalize_called;
160 
161             if (m_Context->finalize_should_throw) {
162                 NCBI_THROW(CCassandraException, eFatal, "Finalize may throw");
163             }
164         }
165     }
166 
167     SConsumeContext* m_Context;
168 };
169 
make_default_plan_mock()170 unique_ptr<MockCassandraFullscanPlan> make_default_plan_mock()
171 {
172     auto plan_mock = make_unique<MockCassandraFullscanPlan>();
173     EXPECT_CALL(*plan_mock, Generate())
174         .Times(1);
175     EXPECT_CALL(*plan_mock, GetQueryCount())
176         .WillOnce(Return(1UL));
177     return plan_mock;
178 }
179 
TEST_F(CCassandraFullscanRunnerTest,NonConfiguredRunnerTest)180 TEST_F(CCassandraFullscanRunnerTest, NonConfiguredRunnerTest) {
181     CCassandraFullscanRunner runner;
182     EXPECT_THROW(runner.Execute(), CCassandraException)
183         << "Execute should throw without configuration";
184 }
185 
TEST_F(CCassandraFullscanRunnerTest,BrokenQueryRunnerTest)186 TEST_F(CCassandraFullscanRunnerTest, BrokenQueryRunnerTest) {
187     auto query = m_Connection->NewQuery();
188     auto plan_mock = make_default_plan_mock();
189     EXPECT_CALL(*plan_mock, GetNextQuery())
190         .WillOnce(Return(query));
191 
192     CCassandraFullscanRunner runner;
193     runner
194         .SetThreadCount(4)
195         .SetExecutionPlan(move(plan_mock))
196         .SetConsumerFactory(
197             [] { return make_unique<CSimpleRowConsumer>(nullptr);}
198         );
199 
200     EXPECT_THROW(runner.SetExecutionPlan(nullptr), CCassandraException)
201         << "SetExecutionPlan should throw when called second time";
202 
203     EXPECT_THROW(runner.Execute(), CCassandraException)
204         << "Execute should throw with broken query";
205 }
206 
TEST_F(CCassandraFullscanRunnerTest,AtLeastOneActiveStatementPerThread)207 TEST_F(CCassandraFullscanRunnerTest, AtLeastOneActiveStatementPerThread) {
208     auto plan_mock = make_unique<MockCassandraFullscanPlan>();
209     EXPECT_CALL(*plan_mock, Generate())
210         .Times(1);
211     EXPECT_CALL(*plan_mock, GetQueryCount())
212         .WillOnce(Return(4UL));
213     EXPECT_CALL(*plan_mock, GetNextQuery())
214         .Times(0);
215 
216     SConsumeContext context;
217     CCassandraFullscanRunner runner;
218     runner
219         .SetThreadCount(4)
220         .SetExecutionPlan(move(plan_mock))
221         .SetMaxActiveStatements(1)
222         .SetConsumerFactory(
223             [&context] {
224                 return make_unique<CSimpleRowConsumer>(&context);
225             }
226         );
227 
228     EXPECT_THROW(runner.Execute(), CCassandraException)
229         << "Execute should throw on wrong max_active_statements";
230 }
231 
TEST_F(CCassandraFullscanRunnerTest,OneThreadRunnerTest)232 TEST_F(CCassandraFullscanRunnerTest, OneThreadRunnerTest) {
233     auto query = m_Connection->NewQuery();
234     query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
235 
236     auto plan_mock = make_default_plan_mock();
237     EXPECT_CALL(*plan_mock, GetNextQuery())
238         .WillOnce(Return(query))
239         .WillRepeatedly(Return(nullptr));
240 
241     SConsumeContext context;
242     CCassandraFullscanRunner runner;
243     runner
244         .SetThreadCount(4)
245         .SetExecutionPlan(move(plan_mock))
246         .SetConsumerFactory(
247             [&context] {
248                 return make_unique<CSimpleRowConsumer>(&context);
249             }
250         );
251 
252     EXPECT_TRUE(runner.Execute());
253     EXPECT_EQ(1UL, context.thread_ids.size());
254     EXPECT_EQ(1UL, context.thread_ids.count(this_thread::get_id()));
255     EXPECT_EQ(1UL, context.ids_returned.size());
256     EXPECT_EQ(1UL, context.ids_returned[160755002]);
257     EXPECT_EQ(1UL, context.finalize_called);
258 }
259 
TEST_F(CCassandraFullscanRunnerTest,MultiThreadRunnerTest)260 TEST_F(CCassandraFullscanRunnerTest, MultiThreadRunnerTest) {
261     auto query = m_Connection->NewQuery();
262     query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
263     auto query1 = m_Connection->NewQuery();
264     query1->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 103317145", 0);
265     auto query2 = m_Connection->NewQuery();
266     query2->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 15724717", 0);
267 
268     auto plan_mock = make_unique<MockCassandraFullscanPlan>();
269     EXPECT_CALL(*plan_mock, Generate())
270         .Times(1);
271     EXPECT_CALL(*plan_mock, GetQueryCount())
272         .WillOnce(Return(3UL));
273     EXPECT_CALL(*plan_mock, GetNextQuery())
274         .Times(6)
275         .WillOnce(Return(query))
276         .WillOnce(Return(query1))
277         .WillOnce(Return(query2))
278         .WillRepeatedly(Return(nullptr));
279 
280     SConsumeContext context;
281     CCassandraFullscanRunner runner;
282     runner
283         .SetThreadCount(4)
284         .SetExecutionPlan(move(plan_mock))
285         .SetConsumerFactory(
286             [&context] {
287                 return make_unique<CSimpleRowConsumer>(&context);
288             }
289         );
290 
291     EXPECT_TRUE(runner.Execute());
292     EXPECT_EQ(3UL, context.thread_ids.size());
293     EXPECT_EQ(1UL, context.thread_ids.count(this_thread::get_id()))
294         << "One worker should be executed in current thread";
295     EXPECT_EQ(3UL, context.ids_returned.size());
296     EXPECT_EQ(1UL, context.ids_returned[160755002]);
297     EXPECT_EQ(1UL, context.ids_returned[103317145]);
298     EXPECT_EQ(3UL, context.ids_returned[15724717]);
299     EXPECT_EQ(3UL, context.finalize_called);
300 }
301 
TEST_F(CCassandraFullscanRunnerTest,FinalizeMayThrowTest)302 TEST_F(CCassandraFullscanRunnerTest, FinalizeMayThrowTest) {
303     auto query = m_Connection->NewQuery();
304     query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
305     auto query1 = m_Connection->NewQuery();
306     query1->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
307 
308     unique_ptr<MockCassandraFullscanPlan> plan_mock = make_default_plan_mock();
309     EXPECT_CALL(*plan_mock, GetNextQuery())
310         .WillOnce(Return(query))
311         .WillOnce(Return(query1))
312         .WillOnce(Return(nullptr));
313 
314     SConsumeContext context;
315     context.finalize_should_throw = true;
316     CCassandraFullscanRunner runner;
317     runner
318         .SetThreadCount(4)
319         .SetExecutionPlan(move(plan_mock))
320         .SetConsumerFactory(
321             [&context] {
322                 return make_unique<CSimpleRowConsumer>(&context);
323             }
324         );
325 
326     EXPECT_THROW(runner.Execute(), CCassandraException)
327         << "Execute should throw with finalize throw";
328 }
329 
TEST_F(CCassandraFullscanRunnerTest,ReadMayThrowTest)330 TEST_F(CCassandraFullscanRunnerTest, ReadMayThrowTest) {
331     auto query = m_Connection->NewQuery();
332     query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
333 
334     auto plan_mock = make_default_plan_mock();
335     EXPECT_CALL(*plan_mock, GetNextQuery())
336         .WillOnce(Return(query))
337         .WillOnce(Return(nullptr));
338 
339     SConsumeContext context;
340     context.read_should_throw = true;
341     CCassandraFullscanRunner runner;
342     runner
343         .SetThreadCount(4)
344         .SetExecutionPlan(move(plan_mock))
345         .SetConsumerFactory(
346             [&context] {
347                 return make_unique<CSimpleRowConsumer>(&context);
348             }
349         );
350 
351     EXPECT_THROW(runner.Execute(), CCassandraException)
352         << "Execute should throw after read throw";
353 }
354 
TEST_F(CCassandraFullscanRunnerTest,ReadMayReturnFalseTest)355 TEST_F(CCassandraFullscanRunnerTest, ReadMayReturnFalseTest) {
356     auto query = m_Connection->NewQuery();
357     query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 15724717", 0);
358 
359     auto plan_mock = make_default_plan_mock();
360     EXPECT_CALL(*plan_mock, GetNextQuery())
361         .WillOnce(Return(query))
362         .WillOnce(Return(nullptr));
363 
364     SConsumeContext context;
365     context.read_result = false;
366     CCassandraFullscanRunner runner;
367     runner
368         .SetThreadCount(4)
369         .SetExecutionPlan(move(plan_mock))
370         .SetConsumerFactory(
371             [&context] {
372                 return make_unique<CSimpleRowConsumer>(&context);
373             }
374         );
375 
376     EXPECT_FALSE(runner.Execute())
377         << "Execute should return false after any read call returns false";
378     EXPECT_EQ(1UL, context.read_called);
379 }
380 
TEST_F(CCassandraFullscanRunnerTest,TickMayReturnFalseTest)381 TEST_F(CCassandraFullscanRunnerTest, TickMayReturnFalseTest) {
382     auto query = m_Connection->NewQuery();
383     query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 15724717", 0);
384     auto query1 = m_Connection->NewQuery();
385     query1->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 160755002", 0);
386 
387     auto plan_mock = make_default_plan_mock();
388     EXPECT_CALL(*plan_mock, GetNextQuery())
389         .WillOnce(Return(query))
390         .WillOnce(Return(query1))
391         .WillOnce(Return(nullptr));
392 
393     SConsumeContext context;
394     context.tick_result = false;
395     CCassandraFullscanRunner runner;
396     runner
397         .SetThreadCount(1)
398         .SetExecutionPlan(move(plan_mock))
399         .SetConsumerFactory(
400             [&context] {
401                 return make_unique<CSimpleRowConsumer>(&context);
402             }
403         );
404 
405     EXPECT_FALSE(runner.Execute())
406         << "Execute should return false after any read call returns false";
407     EXPECT_EQ(1UL, context.tick_called);
408 }
409 
TEST_F(CCassandraFullscanRunnerTest,ResultPagingTest)410 TEST_F(CCassandraFullscanRunnerTest, ResultPagingTest) {
411     auto query = m_Connection->NewQuery();
412     // this group has 3 records
413     query->SetSQL("select ipg from test_ipg_storage_entrez.ipg_report WHERE ipg = 15724717", 0);
414 
415     auto plan_mock = make_default_plan_mock();
416     EXPECT_CALL(*plan_mock, GetNextQuery())
417         .WillOnce(Return(query))
418         .WillOnce(Return(nullptr));
419 
420     SConsumeContext context;
421     CCassandraFullscanRunner runner;
422     runner
423         .SetThreadCount(1)
424         .SetConsistency(CASS_CONSISTENCY_LOCAL_ONE)
425         .SetPageSize(2)
426         .SetExecutionPlan(move(plan_mock))
427         .SetConsumerFactory(
428             [&context] {
429                 return make_unique<CSimpleRowConsumer>(&context);
430             }
431         );
432 
433     EXPECT_TRUE(runner.Execute());
434     EXPECT_EQ(2UL, context.max_consequtive_rows);
435 }
436 
TEST_F(CCassandraFullscanRunnerTest,SmokeTest)437 TEST_F(CCassandraFullscanRunnerTest, SmokeTest) {
438     auto plan = make_unique<CCassandraFullscanPlan>();
439     plan
440         ->SetConnection(m_Connection)
441         .SetFieldList({"ipg", "accession"})
442         .SetKeyspace(m_KeyspaceName)
443         .SetTable(m_TableName);
444 
445     CCassandraFullscanRunner runner;
446     runner
447         .SetThreadCount(4)
448         .SetExecutionPlan(move(plan))
449         .SetConsumerFactory(
450             [] {
451                 return make_unique<CSimpleRowConsumer>(nullptr);
452             }
453         );
454     EXPECT_TRUE(runner.Execute());
455 }
456 
457 }  // namespace
458