1 /***************************************************************************** 2 * $Id: worker.hpp 619524 2020-11-05 19:41:53Z saprykin $ 3 * =========================================================================== 4 * 5 * PUBLIC DOMAIN NOTICE 6 * National Center for Biotechnology Information 7 * 8 * This software/database is a "United States Government Work" under the 9 * terms of the United States Copyright Act. It was written as part of 10 * the author's official duties as a United States Government employee and 11 * thus cannot be copyrighted. This software/database is freely available 12 * to the public for use. The National Library of Medicine and the U.S. 13 * Government have not placed any restriction on its use or reproduction. 14 * 15 * Although all reasonable efforts have been taken to ensure the accuracy 16 * and reliability of the software and data, the NLM and the U.S. 17 * Government do not and cannot warrant the performance or results that 18 * may be obtained by using this software or data. The NLM and the U.S. 19 * Government disclaim all warranties, express or implied, including 20 * warranties of performance, merchantability or fitness for any particular 21 * purpose. 22 * 23 * Please cite the author in any work or product based on this material. 24 * 25 * Db Cassandra: class executing single thread of cassandra fullscan. 26 * 27 */ 28 29 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__FULLSCAN__WORKER_HPP 30 #define OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__FULLSCAN__WORKER_HPP 31 32 #include <corelib/ncbistl.hpp> 33 #include <cassandra.h> 34 35 #include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp> 36 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp> 37 #include <objtools/pubseq_gateway/impl/cassandra/SyncObj.hpp> 38 #include <objtools/pubseq_gateway/impl/cassandra/fullscan/consumer.hpp> 39 #include <objtools/pubseq_gateway/impl/cassandra/fullscan/plan.hpp> 40 41 #include <memory> 42 #include <string> 43 #include <functional> 44 #include <atomic> 45 #include <utility> 46 #include <vector> 47 48 BEGIN_IDBLOB_SCOPE 49 USING_NCBI_SCOPE; 50 51 class CCassandraFullscanWorker 52 { 53 struct SQueryContext 54 : public CCassDataCallbackReceiver 55 { 56 SQueryContext(const SQueryContext &) = delete; 57 SQueryContext & operator=(const SQueryContext &) = delete; 58 SQueryContext(SQueryContext &&) = default; 59 SQueryContext & operator=(SQueryContext &&) = default; 60 SQueryContextCCassandraFullscanWorker::SQueryContext61 SQueryContext() 62 : SQueryContext(nullptr, nullptr, 0) 63 {} SQueryContextCCassandraFullscanWorker::SQueryContext64 SQueryContext(CCassandraFullscanPlan::TQueryPtr q, shared_ptr<CFutex> ready, unsigned int max_retries) 65 : query(move(q)) 66 , data_ready(false) 67 , total_ready(ready) 68 , retires(max_retries) 69 {} 70 OnDataCCassandraFullscanWorker::SQueryContext71 void OnData() override 72 { 73 data_ready = true; 74 total_ready->Inc(); 75 } 76 ~SQueryContextCCassandraFullscanWorker::SQueryContext77 ~SQueryContext() 78 { 79 if (query) { 80 query->Close(); 81 query = nullptr; 82 } 83 } 84 85 CCassandraFullscanPlan::TQueryPtr query; 86 atomic_bool data_ready; 87 shared_ptr<CFutex> total_ready; 88 unsigned int retires; 89 }; 90 91 public: 92 using TTaskProvider = function<CCassandraFullscanPlan::TQueryPtr()>; 93 94 CCassandraFullscanWorker(); 95 CCassandraFullscanWorker(CCassandraFullscanWorker&&) = default; 96 CCassandraFullscanWorker(const CCassandraFullscanWorker&) = delete; 97 CCassandraFullscanWorker& operator=(CCassandraFullscanWorker&&) = default; 98 CCassandraFullscanWorker& operator=(const CCassandraFullscanWorker&) = delete; 99 100 CCassandraFullscanWorker& SetConsistency(CassConsistency value); 101 CCassandraFullscanWorker& SetPageSize(unsigned int value); 102 CCassandraFullscanWorker& SetRowConsumer(unique_ptr<ICassandraFullscanConsumer> consumer); 103 CCassandraFullscanWorker& SetMaxActiveStatements(unsigned int value); 104 CCassandraFullscanWorker& SetTaskProvider(TTaskProvider provider); 105 CCassandraFullscanWorker& SetMaxRetryCount(unsigned int max_retry_count); 106 107 void operator()(); 108 bool IsFinished() const; 109 bool HadError() const; 110 string GetFirstError() const; 111 112 private: 113 bool StartQuery(size_t index); 114 bool ProcessQueryResult(size_t index); 115 CCassandraFullscanPlan::TQueryPtr GetNextTask(); 116 void ProcessError(string const & msg); 117 void ProcessError(exception const & e); 118 119 unique_ptr<ICassandraFullscanConsumer> m_RowConsumer; 120 CassConsistency m_Consistency; 121 unsigned int m_PageSize; 122 unsigned int m_MaxActiveStatements; 123 TTaskProvider m_TaskProvider; 124 125 vector<shared_ptr<SQueryContext>> m_Queries; 126 shared_ptr<CFutex> m_ReadyQueries; 127 unique_ptr<atomic_long> m_ActiveQueries; 128 unsigned int m_QueryMaxRetryCount; 129 bool m_Finished; 130 bool m_HadError; 131 string m_FirstError; 132 }; 133 134 END_IDBLOB_SCOPE 135 136 #endif // OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__FULLSCAN__WORKER_HPP 137