1 /*****************************************************************************
2  *  $Id: plan.cpp 581888 2019-03-06 19:46:12Z dmitrie1 $
3  * ===========================================================================
4  *
5  *                            PUBLIC DOMAIN NOTICE
6  *               National Center for Biotechnology Information
7  *
8  *  This software/database is a "United States Government Work" under the
9  *  terms of the United States Copyright Act.  It was written as part of
10  *  the author's official duties as a United States Government employee and
11  *  thus cannot be copyrighted.  This software/database is freely available
12  *  to the public for use. The National Library of Medicine and the U.S.
13  *  Government have not placed any restriction on its use or reproduction.
14  *
15  *  Although all reasonable efforts have been taken to ensure the accuracy
16  *  and reliability of the software and data, the NLM and the U.S.
17  *  Government do not and cannot warrant the performance or results that
18  *  may be obtained by using this software or data. The NLM and the U.S.
19  *  Government disclaim all warranties, express or implied, including
20  *  warranties of performance, merchantability or fitness for any particular
21  *  purpose.
22  *
23  *  Please cite the author in any work or product based on this material.
24  *
25  *  Db Cassandra: class generating execution plans for cassandra table fullscans.
26  *
27  */
28 
29 #include <ncbi_pch.hpp>
30 
31 #include <objtools/pubseq_gateway/impl/cassandra/fullscan/plan.hpp>
32 
33 #include <vector>
34 #include <string>
35 #include <utility>
36 #include <memory>
37 #include <algorithm>
38 
39 #include <corelib/ncbistr.hpp>
40 
41 BEGIN_IDBLOB_SCOPE
42 USING_NCBI_SCOPE;
43 
CCassandraFullscanPlan()44 CCassandraFullscanPlan::CCassandraFullscanPlan()
45     : m_Connection(nullptr)
46     , m_FieldList({"*"})
47     , m_MinPartitionsForSubrangeScan(kMinPartitionsForSubrangeScanDefault)
48 {
49 }
50 
SetConnection(shared_ptr<CCassConnection> connection)51 CCassandraFullscanPlan& CCassandraFullscanPlan::SetConnection(shared_ptr<CCassConnection> connection)
52 {
53     swap(m_Connection, connection);
54     return *this;
55 }
56 
SetFieldList(vector<string> fields)57 CCassandraFullscanPlan& CCassandraFullscanPlan::SetFieldList(vector<string> fields)
58 {
59     m_FieldList = move(fields);
60     return *this;
61 }
62 
SetWhereFilter(string const & where_filter)63 CCassandraFullscanPlan& CCassandraFullscanPlan::SetWhereFilter(string const & where_filter)
64 {
65     m_WhereFilter = where_filter;
66     return *this;
67 }
68 
SetMinPartitionsForSubrangeScan(size_t value)69 CCassandraFullscanPlan& CCassandraFullscanPlan::SetMinPartitionsForSubrangeScan(size_t value)
70 {
71     m_MinPartitionsForSubrangeScan = value;
72     return *this;
73 }
74 
SetKeyspace(string const & keyspace)75 CCassandraFullscanPlan& CCassandraFullscanPlan::SetKeyspace(string const & keyspace)
76 {
77     m_Keyspace = keyspace;
78     return *this;
79 }
80 
SetTable(string const & table)81 CCassandraFullscanPlan& CCassandraFullscanPlan::SetTable(string const & table)
82 {
83     m_Table = table;
84     return *this;
85 }
86 
GetMinPartitionsForSubrangeScan()87 size_t CCassandraFullscanPlan::GetMinPartitionsForSubrangeScan()
88 {
89     return m_MinPartitionsForSubrangeScan;
90 }
91 
GetPartitionCountEstimate()92 size_t CCassandraFullscanPlan::GetPartitionCountEstimate()
93 {
94     string datacenter, schema, schema_bytes;
95     int64_t peers_count = 0, partition_count = 0;
96 
97     shared_ptr<CCassQuery> query = m_Connection->NewQuery();
98     query->SetSQL("SELECT data_center, schema_version, uuidAsBlob(schema_version) FROM system.local", 0);
99     query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
100     query->NextRow();
101     datacenter = query->FieldGetStrValue(0);
102     schema = query->FieldGetStrValue(1);
103     schema_bytes = query->FieldGetStrValue(2);
104     ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Datacenter  '" << datacenter << "'");
105     ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Schema  '" << schema << "'");
106     ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Bytes size " << schema_bytes.size());
107 
108     query = m_Connection->NewQuery();
109     query->SetSQL("SELECT count(*) FROM system.peers WHERE data_center = ? and schema_version = ? ALLOW FILTERING", 2);
110     query->BindStr(0, datacenter);
111     query->BindBytes(1, reinterpret_cast<const unsigned char*>(schema_bytes.c_str()), schema_bytes.size());
112     query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
113     query->NextRow();
114     peers_count = query->FieldGetInt64Value(0, 0);
115     ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Peers count  '" << peers_count << "'");
116 
117     query = m_Connection->NewQuery();
118     query->SetSQL("SELECT partitions_count FROM system.size_estimates WHERE table_name = ? AND keyspace_name = ?", 2);
119     query->BindStr(0, m_Table);
120     query->BindStr(1, m_Keyspace);
121     query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
122     while (query->NextRow() == ar_dataready) {
123         partition_count += query->FieldGetInt64Value(0);
124     }
125 
126     ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - "
127         "Local rows estimate -  '" << partition_count << "'");
128     ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - "
129         "Total rows estimate -  '" << partition_count * (peers_count + 1) << "'");
130 
131     return partition_count * (peers_count + 1);
132 }
133 
GetNextQuery()134 CCassandraFullscanPlan::TQueryPtr CCassandraFullscanPlan::GetNextQuery()
135 {
136     shared_ptr<CCassQuery> query;
137     if (m_TokenRanges.empty()) {
138         return nullptr;
139     } else if (m_TokenRanges.size() == 1 && m_TokenRanges[0].first == 0 && m_TokenRanges[0].second == 0) {
140         query = m_Connection->NewQuery();
141         query->SetSQL(m_SqlTemplate, 0);
142     } else {
143         query = m_Connection->NewQuery();
144         query->SetSQL(m_SqlTemplate, 2);
145         query->BindInt64(0, m_TokenRanges.back().first);
146         query->BindInt64(1, m_TokenRanges.back().second);
147     }
148     m_TokenRanges.pop_back();
149     return query;
150 }
151 
GetQueryCount() const152 size_t CCassandraFullscanPlan::GetQueryCount() const
153 {
154     return m_TokenRanges.size();
155 }
156 
Generate()157 void CCassandraFullscanPlan::Generate()
158 {
159     if (!m_Connection || m_Keyspace.empty() || m_Table.empty()) {
160         NCBI_THROW(CCassandraException, eSeqFailed, "Invalid sequence of operations, connection should be provided");
161     }
162 
163     m_TokenRanges.clear();
164     if (GetPartitionCountEstimate() < m_MinPartitionsForSubrangeScan) {
165         m_SqlTemplate = "SELECT " + NStr::Join(m_FieldList, ", ") + " FROM " + m_Keyspace + "." + m_Table;
166         if (!m_WhereFilter.empty())
167             m_SqlTemplate += " WHERE " + m_WhereFilter + " ALLOW FILTERING";
168         m_TokenRanges.push_back(make_pair(0, 0));
169     } else {
170         vector<string> partition_fields = m_Connection->GetPartitionKeyColumnNames(m_Keyspace, m_Table);
171         m_Connection->GetTokenRanges(m_TokenRanges);
172         string partition = NStr::Join(partition_fields, ",");
173         m_SqlTemplate = "SELECT " + NStr::Join(m_FieldList, ", ") + " FROM "
174             + m_Keyspace + "." + m_Table + " WHERE TOKEN(" + partition + ") > ? AND TOKEN(" + partition + ") <= ?";
175         if (!m_WhereFilter.empty())
176             m_SqlTemplate += " AND " + m_WhereFilter + " ALLOW FILTERING";
177         ERR_POST(Trace << "CCassandraFullscanPlanner::Generate - Sql template = '" << m_SqlTemplate << "'");
178     }
179 }
180 
GetTokenRanges()181 CCassConnection::TTokenRanges& CCassandraFullscanPlan::GetTokenRanges()
182 {
183     return m_TokenRanges;
184 }
185 
186 END_IDBLOB_SCOPE
187