1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "loop_test.hpp"
18 
19 #include "query_request.hpp"
20 #include "session.hpp"
21 #include "uuids.hpp"
22 
23 using namespace mockssandra;
24 using datastax::internal::core::Config;
25 using datastax::internal::core::Future;
26 using datastax::internal::core::QueryRequest;
27 using datastax::internal::core::Session;
28 using datastax::internal::core::UuidGen;
29 
30 #define SELECT_LOCAL_SCHEMA_CHANGE "SELECT schema_version FROM system.local WHERE key='local'"
31 #define SELECT_PEERS_SCHEMA_CHANGE \
32   "SELECT peer, rpc_address, host_id, schema_version FROM system.peers"
33 
34 class SchemaAgreementUnitTest : public LoopTest {
35 public:
connect(Session * session,uint64_t wait_for_time_us=WAIT_FOR_TIME)36   static void connect(Session* session, uint64_t wait_for_time_us = WAIT_FOR_TIME) {
37     Config config;
38     config.set_max_schema_wait_time_ms(500);
39     config.contact_points().push_back(Address("127.0.0.1", 9042));
40     Future::Ptr connect_future(session->connect(config));
41     ASSERT_TRUE(connect_future->wait_for(wait_for_time_us))
42         << "Timed out waiting for session to connect";
43     ASSERT_FALSE(connect_future->error()) << cass_error_desc(connect_future->error()->code) << ": "
44                                           << connect_future->error()->message;
45   }
46 
close(Session * session,uint64_t wait_for_time_us=WAIT_FOR_TIME)47   static void close(Session* session, uint64_t wait_for_time_us = WAIT_FOR_TIME) {
48     Future::Ptr close_future(session->close());
49     ASSERT_TRUE(close_future->wait_for(wait_for_time_us))
50         << "Timed out waiting for session to close";
51     ASSERT_FALSE(close_future->error())
52         << cass_error_desc(close_future->error()->code) << ": " << close_future->error()->message;
53   }
54 
execute(Session * session,const String & query)55   static void execute(Session* session, const String& query) {
56     Future::Ptr request_future(session->execute(QueryRequest::ConstPtr(new QueryRequest(query))));
57     EXPECT_TRUE(request_future->wait_for(WAIT_FOR_TIME));
58     EXPECT_FALSE(request_future->error());
59   }
60 
61   struct SchemaVersionCheckCounts {
SchemaVersionCheckCountsSchemaAgreementUnitTest::SchemaVersionCheckCounts62     SchemaVersionCheckCounts()
63         : local_count(0)
64         , peers_count(0) {}
65     Atomic<int> local_count;
66     Atomic<int> peers_count;
67   };
68 
69   class SystemSchemaVersion : public Action {
70   public:
71     enum AgreementType { NEVER_REACH_AGREEMENT, IMMEDIATE_AGREEMENT };
72 
SystemSchemaVersion(AgreementType type,SchemaVersionCheckCounts * counts)73     SystemSchemaVersion(AgreementType type, SchemaVersionCheckCounts* counts)
74         : type_(type)
75         , check_counts_(counts) {
76       uuid_gen_.generate_random(&uuid_);
77     }
78 
on_run(Request * request) const79     void on_run(Request* request) const {
80       String query;
81       QueryParameters params;
82       if (!request->decode_query(&query, &params)) {
83         request->error(ERROR_PROTOCOL_ERROR, "Invalid query message");
84       } else if (query.find(SELECT_LOCAL_SCHEMA_CHANGE) != String::npos) {
85         ResultSet local_rs = ResultSet::Builder("system", "local")
86                                  .column("schema_version", Type::uuid())
87                                  .row(Row::Builder().uuid(generate_version()).build())
88                                  .build();
89         request->write(OPCODE_RESULT, local_rs.encode(request->version()));
90         check_counts_->local_count.fetch_add(1);
91       } else if (query.find(SELECT_PEERS_SCHEMA_CHANGE) != String::npos) {
92         ResultSet::Builder peers_builder = ResultSet::Builder("system", "peers")
93                                                .column("peer", Type::inet())
94                                                .column("rpc_address", Type::inet())
95                                                .column("host_id", Type::uuid())
96                                                .column("schema_version", Type::uuid());
97         Hosts hosts(request->hosts());
98         for (Hosts::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
99           const Host& host(*it);
100           if (host.address == request->address()) {
101             continue;
102           }
103           peers_builder.row(Row::Builder()
104                                 .inet(host.address)
105                                 .inet(host.address)
106                                 .uuid(generate_version()) // Doesn't matter
107                                 .uuid(generate_version())
108                                 .build());
109         }
110         ResultSet peers_rs = peers_builder.build();
111         request->write(OPCODE_RESULT, peers_rs.encode(request->version()));
112         check_counts_->peers_count.fetch_add(1);
113       } else {
114         run_next(request);
115       }
116     }
117 
118   private:
generate_version() const119     CassUuid generate_version() const {
120       CassUuid version;
121       if (type_ == IMMEDIATE_AGREEMENT) {
122         version = uuid_;
123       } else {
124         uuid_gen_.generate_random(&version);
125       }
126       return version;
127     }
128 
129   private:
130     AgreementType type_;
131     CassUuid uuid_;
132     SchemaVersionCheckCounts* check_counts_;
133     mutable UuidGen uuid_gen_;
134   };
135 
136   class SchemaChange : public Action {
137   public:
on_run(Request * request) const138     void on_run(Request* request) const {
139       String query;
140       QueryParameters params;
141       if (!request->decode_query(&query, &params)) {
142         request->error(ERROR_PROTOCOL_ERROR, "Invalid query message");
143       } else if (query.find("CREATE TABLE") != String::npos) {
144         request->write(OPCODE_RESULT, encode_schema_change("CREATE", "TABLE"));
145       } else if (query.find("DROP TABLE") != String::npos) {
146         request->write(OPCODE_RESULT, encode_schema_change("DROP", "TABLE"));
147       } else {
148         run_next(request);
149       }
150     }
151 
152   private:
encode_schema_change(String change_type,String target) const153     String encode_schema_change(String change_type, String target) const {
154       String body;
155       encode_int32(RESULT_SCHEMA_CHANGE, &body); // Result type
156       encode_string(change_type, &body);
157       encode_string("keyspace", &body);
158       if (target == "TABLE") {
159         encode_string("table", &body);
160       }
161       return body;
162     }
163   };
164 };
165 
166 /**
167  * Verify that schema changes wait for schema agreement.
168  */
TEST_F(SchemaAgreementUnitTest,Simple)169 TEST_F(SchemaAgreementUnitTest, Simple) {
170   SchemaVersionCheckCounts check_counts;
171 
172   mockssandra::SimpleRequestHandlerBuilder builder;
173   builder.on(OPCODE_QUERY)
174       .execute(new SystemSchemaVersion(SystemSchemaVersion::IMMEDIATE_AGREEMENT, &check_counts))
175       .execute(new SchemaChange())
176       .system_local()
177       .system_peers()
178       .empty_rows_result(1);
179 
180   mockssandra::SimpleCluster cluster(builder.build(), 3);
181   ASSERT_EQ(cluster.start_all(), 0);
182 
183   Session session;
184   connect(&session);
185 
186   add_logging_critera("Found schema agreement in");
187 
188   execute(&session, "CREATE TABLE tbl (key text PRIMARY KEY, value text)");
189   EXPECT_EQ(check_counts.local_count.load(), 1);
190   EXPECT_EQ(check_counts.peers_count.load(), 1);
191   EXPECT_EQ(logging_criteria_count(), 1);
192 
193   cluster.stop(2);
194   execute(&session, "DROP TABLE tbl");
195   EXPECT_EQ(check_counts.local_count.load(), 2);
196   EXPECT_EQ(check_counts.peers_count.load(), 2);
197   EXPECT_EQ(logging_criteria_count(), 2);
198 
199   cluster.stop(3);
200   execute(&session, "CREATE TABLE tbl (key text PRIMARY KEY, value text)");
201   EXPECT_EQ(check_counts.local_count.load(), 3);
202   EXPECT_EQ(check_counts.peers_count.load(), 3);
203   EXPECT_EQ(logging_criteria_count(), 3);
204 
205   close(&session);
206 }
207 
208 /**
209  * Verify that schema changes will timeout properly while waiting for schema agreement.
210  */
TEST_F(SchemaAgreementUnitTest,Timeout)211 TEST_F(SchemaAgreementUnitTest, Timeout) {
212   SchemaVersionCheckCounts check_counts;
213 
214   mockssandra::SimpleRequestHandlerBuilder builder;
215   builder.on(OPCODE_QUERY)
216       .execute(new SystemSchemaVersion(SystemSchemaVersion::NEVER_REACH_AGREEMENT, &check_counts))
217       .execute(new SchemaChange())
218       .system_local()
219       .system_peers()
220       .empty_rows_result(1);
221 
222   mockssandra::SimpleCluster cluster(builder.build(), 3);
223   ASSERT_EQ(cluster.start_all(), 0);
224 
225   Session session;
226   connect(&session);
227 
228   add_logging_critera("No schema agreement on live nodes after ");
229 
230   execute(&session, "CREATE TABLE tbl (key text PRIMARY KEY, value text)");
231 
232   EXPECT_GT(check_counts.local_count.load(), 1);
233   EXPECT_GT(check_counts.peers_count.load(), 1);
234 
235   EXPECT_EQ(logging_criteria_count(), 1);
236 
237   close(&session);
238 }
239