1 /*****************************************************************************
2 * $Id: plan.cpp 581888 2019-03-06 19:46:12Z dmitrie1 $
3 * ===========================================================================
4 *
5 * PUBLIC DOMAIN NOTICE
6 * National Center for Biotechnology Information
7 *
8 * This software/database is a "United States Government Work" under the
9 * terms of the United States Copyright Act. It was written as part of
10 * the author's official duties as a United States Government employee and
11 * thus cannot be copyrighted. This software/database is freely available
12 * to the public for use. The National Library of Medicine and the U.S.
13 * Government have not placed any restriction on its use or reproduction.
14 *
15 * Although all reasonable efforts have been taken to ensure the accuracy
16 * and reliability of the software and data, the NLM and the U.S.
17 * Government do not and cannot warrant the performance or results that
18 * may be obtained by using this software or data. The NLM and the U.S.
19 * Government disclaim all warranties, express or implied, including
20 * warranties of performance, merchantability or fitness for any particular
21 * purpose.
22 *
23 * Please cite the author in any work or product based on this material.
24 *
25 * Db Cassandra: class generating execution plans for cassandra table fullscans.
26 *
27 */
28
29 #include <ncbi_pch.hpp>
30
31 #include <objtools/pubseq_gateway/impl/cassandra/fullscan/plan.hpp>
32
33 #include <vector>
34 #include <string>
35 #include <utility>
36 #include <memory>
37 #include <algorithm>
38
39 #include <corelib/ncbistr.hpp>
40
41 BEGIN_IDBLOB_SCOPE
42 USING_NCBI_SCOPE;
43
CCassandraFullscanPlan()44 CCassandraFullscanPlan::CCassandraFullscanPlan()
45 : m_Connection(nullptr)
46 , m_FieldList({"*"})
47 , m_MinPartitionsForSubrangeScan(kMinPartitionsForSubrangeScanDefault)
48 {
49 }
50
SetConnection(shared_ptr<CCassConnection> connection)51 CCassandraFullscanPlan& CCassandraFullscanPlan::SetConnection(shared_ptr<CCassConnection> connection)
52 {
53 swap(m_Connection, connection);
54 return *this;
55 }
56
SetFieldList(vector<string> fields)57 CCassandraFullscanPlan& CCassandraFullscanPlan::SetFieldList(vector<string> fields)
58 {
59 m_FieldList = move(fields);
60 return *this;
61 }
62
SetWhereFilter(string const & where_filter)63 CCassandraFullscanPlan& CCassandraFullscanPlan::SetWhereFilter(string const & where_filter)
64 {
65 m_WhereFilter = where_filter;
66 return *this;
67 }
68
SetMinPartitionsForSubrangeScan(size_t value)69 CCassandraFullscanPlan& CCassandraFullscanPlan::SetMinPartitionsForSubrangeScan(size_t value)
70 {
71 m_MinPartitionsForSubrangeScan = value;
72 return *this;
73 }
74
SetKeyspace(string const & keyspace)75 CCassandraFullscanPlan& CCassandraFullscanPlan::SetKeyspace(string const & keyspace)
76 {
77 m_Keyspace = keyspace;
78 return *this;
79 }
80
SetTable(string const & table)81 CCassandraFullscanPlan& CCassandraFullscanPlan::SetTable(string const & table)
82 {
83 m_Table = table;
84 return *this;
85 }
86
GetMinPartitionsForSubrangeScan()87 size_t CCassandraFullscanPlan::GetMinPartitionsForSubrangeScan()
88 {
89 return m_MinPartitionsForSubrangeScan;
90 }
91
GetPartitionCountEstimate()92 size_t CCassandraFullscanPlan::GetPartitionCountEstimate()
93 {
94 string datacenter, schema, schema_bytes;
95 int64_t peers_count = 0, partition_count = 0;
96
97 shared_ptr<CCassQuery> query = m_Connection->NewQuery();
98 query->SetSQL("SELECT data_center, schema_version, uuidAsBlob(schema_version) FROM system.local", 0);
99 query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
100 query->NextRow();
101 datacenter = query->FieldGetStrValue(0);
102 schema = query->FieldGetStrValue(1);
103 schema_bytes = query->FieldGetStrValue(2);
104 ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Datacenter '" << datacenter << "'");
105 ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Schema '" << schema << "'");
106 ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Bytes size " << schema_bytes.size());
107
108 query = m_Connection->NewQuery();
109 query->SetSQL("SELECT count(*) FROM system.peers WHERE data_center = ? and schema_version = ? ALLOW FILTERING", 2);
110 query->BindStr(0, datacenter);
111 query->BindBytes(1, reinterpret_cast<const unsigned char*>(schema_bytes.c_str()), schema_bytes.size());
112 query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
113 query->NextRow();
114 peers_count = query->FieldGetInt64Value(0, 0);
115 ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Peers count '" << peers_count << "'");
116
117 query = m_Connection->NewQuery();
118 query->SetSQL("SELECT partitions_count FROM system.size_estimates WHERE table_name = ? AND keyspace_name = ?", 2);
119 query->BindStr(0, m_Table);
120 query->BindStr(1, m_Keyspace);
121 query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
122 while (query->NextRow() == ar_dataready) {
123 partition_count += query->FieldGetInt64Value(0);
124 }
125
126 ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - "
127 "Local rows estimate - '" << partition_count << "'");
128 ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - "
129 "Total rows estimate - '" << partition_count * (peers_count + 1) << "'");
130
131 return partition_count * (peers_count + 1);
132 }
133
GetNextQuery()134 CCassandraFullscanPlan::TQueryPtr CCassandraFullscanPlan::GetNextQuery()
135 {
136 shared_ptr<CCassQuery> query;
137 if (m_TokenRanges.empty()) {
138 return nullptr;
139 } else if (m_TokenRanges.size() == 1 && m_TokenRanges[0].first == 0 && m_TokenRanges[0].second == 0) {
140 query = m_Connection->NewQuery();
141 query->SetSQL(m_SqlTemplate, 0);
142 } else {
143 query = m_Connection->NewQuery();
144 query->SetSQL(m_SqlTemplate, 2);
145 query->BindInt64(0, m_TokenRanges.back().first);
146 query->BindInt64(1, m_TokenRanges.back().second);
147 }
148 m_TokenRanges.pop_back();
149 return query;
150 }
151
GetQueryCount() const152 size_t CCassandraFullscanPlan::GetQueryCount() const
153 {
154 return m_TokenRanges.size();
155 }
156
Generate()157 void CCassandraFullscanPlan::Generate()
158 {
159 if (!m_Connection || m_Keyspace.empty() || m_Table.empty()) {
160 NCBI_THROW(CCassandraException, eSeqFailed, "Invalid sequence of operations, connection should be provided");
161 }
162
163 m_TokenRanges.clear();
164 if (GetPartitionCountEstimate() < m_MinPartitionsForSubrangeScan) {
165 m_SqlTemplate = "SELECT " + NStr::Join(m_FieldList, ", ") + " FROM " + m_Keyspace + "." + m_Table;
166 if (!m_WhereFilter.empty())
167 m_SqlTemplate += " WHERE " + m_WhereFilter + " ALLOW FILTERING";
168 m_TokenRanges.push_back(make_pair(0, 0));
169 } else {
170 vector<string> partition_fields = m_Connection->GetPartitionKeyColumnNames(m_Keyspace, m_Table);
171 m_Connection->GetTokenRanges(m_TokenRanges);
172 string partition = NStr::Join(partition_fields, ",");
173 m_SqlTemplate = "SELECT " + NStr::Join(m_FieldList, ", ") + " FROM "
174 + m_Keyspace + "." + m_Table + " WHERE TOKEN(" + partition + ") > ? AND TOKEN(" + partition + ") <= ?";
175 if (!m_WhereFilter.empty())
176 m_SqlTemplate += " AND " + m_WhereFilter + " ALLOW FILTERING";
177 ERR_POST(Trace << "CCassandraFullscanPlanner::Generate - Sql template = '" << m_SqlTemplate << "'");
178 }
179 }
180
GetTokenRanges()181 CCassConnection::TTokenRanges& CCassandraFullscanPlan::GetTokenRanges()
182 {
183 return m_TokenRanges;
184 }
185
186 END_IDBLOB_SCOPE
187