1 /*****************************************************************************
2  *  $Id: worker.hpp 619524 2020-11-05 19:41:53Z saprykin $
3  * ===========================================================================
4  *
5  *                            PUBLIC DOMAIN NOTICE
6  *               National Center for Biotechnology Information
7  *
8  *  This software/database is a "United States Government Work" under the
9  *  terms of the United States Copyright Act.  It was written as part of
10  *  the author's official duties as a United States Government employee and
11  *  thus cannot be copyrighted.  This software/database is freely available
12  *  to the public for use. The National Library of Medicine and the U.S.
13  *  Government have not placed any restriction on its use or reproduction.
14  *
15  *  Although all reasonable efforts have been taken to ensure the accuracy
16  *  and reliability of the software and data, the NLM and the U.S.
17  *  Government do not and cannot warrant the performance or results that
18  *  may be obtained by using this software or data. The NLM and the U.S.
19  *  Government disclaim all warranties, express or implied, including
20  *  warranties of performance, merchantability or fitness for any particular
21  *  purpose.
22  *
23  *  Please cite the author in any work or product based on this material.
24  *
25  *  Db Cassandra: class executing single thread of cassandra fullscan.
26  *
27  */
28 
29 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__FULLSCAN__WORKER_HPP
30 #define OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__FULLSCAN__WORKER_HPP
31 
32 #include <corelib/ncbistl.hpp>
33 #include <cassandra.h>
34 
35 #include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
36 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
37 #include <objtools/pubseq_gateway/impl/cassandra/SyncObj.hpp>
38 #include <objtools/pubseq_gateway/impl/cassandra/fullscan/consumer.hpp>
39 #include <objtools/pubseq_gateway/impl/cassandra/fullscan/plan.hpp>
40 
41 #include <memory>
42 #include <string>
43 #include <functional>
44 #include <atomic>
45 #include <utility>
46 #include <vector>
47 
48 BEGIN_IDBLOB_SCOPE
49 USING_NCBI_SCOPE;
50 
51 class CCassandraFullscanWorker
52 {
53     struct SQueryContext
54         : public CCassDataCallbackReceiver
55     {
56         SQueryContext(const SQueryContext &) = delete;
57         SQueryContext & operator=(const SQueryContext &) = delete;
58         SQueryContext(SQueryContext &&) = default;
59         SQueryContext & operator=(SQueryContext &&) = default;
60 
SQueryContextCCassandraFullscanWorker::SQueryContext61         SQueryContext()
62             : SQueryContext(nullptr, nullptr, 0)
63         {}
SQueryContextCCassandraFullscanWorker::SQueryContext64         SQueryContext(CCassandraFullscanPlan::TQueryPtr q, shared_ptr<CFutex> ready, unsigned int max_retries)
65             : query(move(q))
66             , data_ready(false)
67             , total_ready(ready)
68             , retires(max_retries)
69         {}
70 
OnDataCCassandraFullscanWorker::SQueryContext71         void OnData() override
72         {
73             data_ready = true;
74             total_ready->Inc();
75         }
76 
~SQueryContextCCassandraFullscanWorker::SQueryContext77         ~SQueryContext()
78         {
79             if (query) {
80                 query->Close();
81                 query = nullptr;
82             }
83         }
84 
85         CCassandraFullscanPlan::TQueryPtr query;
86         atomic_bool data_ready;
87         shared_ptr<CFutex> total_ready;
88         unsigned int retires;
89     };
90 
91  public:
92     using TTaskProvider = function<CCassandraFullscanPlan::TQueryPtr()>;
93 
94     CCassandraFullscanWorker();
95     CCassandraFullscanWorker(CCassandraFullscanWorker&&) = default;
96     CCassandraFullscanWorker(const CCassandraFullscanWorker&) = delete;
97     CCassandraFullscanWorker& operator=(CCassandraFullscanWorker&&) = default;
98     CCassandraFullscanWorker& operator=(const CCassandraFullscanWorker&) = delete;
99 
100     CCassandraFullscanWorker& SetConsistency(CassConsistency value);
101     CCassandraFullscanWorker& SetPageSize(unsigned int value);
102     CCassandraFullscanWorker& SetRowConsumer(unique_ptr<ICassandraFullscanConsumer> consumer);
103     CCassandraFullscanWorker& SetMaxActiveStatements(unsigned int value);
104     CCassandraFullscanWorker& SetTaskProvider(TTaskProvider provider);
105     CCassandraFullscanWorker& SetMaxRetryCount(unsigned int max_retry_count);
106 
107     void operator()();
108     bool IsFinished() const;
109     bool HadError() const;
110     string GetFirstError() const;
111 
112  private:
113     bool StartQuery(size_t index);
114     bool ProcessQueryResult(size_t index);
115     CCassandraFullscanPlan::TQueryPtr GetNextTask();
116     void ProcessError(string const & msg);
117     void ProcessError(exception const & e);
118 
119     unique_ptr<ICassandraFullscanConsumer> m_RowConsumer;
120     CassConsistency m_Consistency;
121     unsigned int m_PageSize;
122     unsigned int m_MaxActiveStatements;
123     TTaskProvider m_TaskProvider;
124 
125     vector<shared_ptr<SQueryContext>> m_Queries;
126     shared_ptr<CFutex> m_ReadyQueries;
127     unique_ptr<atomic_long> m_ActiveQueries;
128     unsigned int m_QueryMaxRetryCount;
129     bool m_Finished;
130     bool m_HadError;
131     string m_FirstError;
132 };
133 
134 END_IDBLOB_SCOPE
135 
136 #endif  // OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__FULLSCAN__WORKER_HPP
137