1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "loop_test.hpp"
18
19 #include "query_request.hpp"
20 #include "session.hpp"
21 #include "uuids.hpp"
22
23 using namespace mockssandra;
24 using datastax::internal::core::Config;
25 using datastax::internal::core::Future;
26 using datastax::internal::core::QueryRequest;
27 using datastax::internal::core::Session;
28 using datastax::internal::core::UuidGen;
29
30 #define SELECT_LOCAL_SCHEMA_CHANGE "SELECT schema_version FROM system.local WHERE key='local'"
31 #define SELECT_PEERS_SCHEMA_CHANGE \
32 "SELECT peer, rpc_address, host_id, schema_version FROM system.peers"
33
34 class SchemaAgreementUnitTest : public LoopTest {
35 public:
connect(Session * session,uint64_t wait_for_time_us=WAIT_FOR_TIME)36 static void connect(Session* session, uint64_t wait_for_time_us = WAIT_FOR_TIME) {
37 Config config;
38 config.set_max_schema_wait_time_ms(500);
39 config.contact_points().push_back(Address("127.0.0.1", 9042));
40 Future::Ptr connect_future(session->connect(config));
41 ASSERT_TRUE(connect_future->wait_for(wait_for_time_us))
42 << "Timed out waiting for session to connect";
43 ASSERT_FALSE(connect_future->error()) << cass_error_desc(connect_future->error()->code) << ": "
44 << connect_future->error()->message;
45 }
46
close(Session * session,uint64_t wait_for_time_us=WAIT_FOR_TIME)47 static void close(Session* session, uint64_t wait_for_time_us = WAIT_FOR_TIME) {
48 Future::Ptr close_future(session->close());
49 ASSERT_TRUE(close_future->wait_for(wait_for_time_us))
50 << "Timed out waiting for session to close";
51 ASSERT_FALSE(close_future->error())
52 << cass_error_desc(close_future->error()->code) << ": " << close_future->error()->message;
53 }
54
execute(Session * session,const String & query)55 static void execute(Session* session, const String& query) {
56 Future::Ptr request_future(session->execute(QueryRequest::ConstPtr(new QueryRequest(query))));
57 EXPECT_TRUE(request_future->wait_for(WAIT_FOR_TIME));
58 EXPECT_FALSE(request_future->error());
59 }
60
61 struct SchemaVersionCheckCounts {
SchemaVersionCheckCountsSchemaAgreementUnitTest::SchemaVersionCheckCounts62 SchemaVersionCheckCounts()
63 : local_count(0)
64 , peers_count(0) {}
65 Atomic<int> local_count;
66 Atomic<int> peers_count;
67 };
68
69 class SystemSchemaVersion : public Action {
70 public:
71 enum AgreementType { NEVER_REACH_AGREEMENT, IMMEDIATE_AGREEMENT };
72
SystemSchemaVersion(AgreementType type,SchemaVersionCheckCounts * counts)73 SystemSchemaVersion(AgreementType type, SchemaVersionCheckCounts* counts)
74 : type_(type)
75 , check_counts_(counts) {
76 uuid_gen_.generate_random(&uuid_);
77 }
78
on_run(Request * request) const79 void on_run(Request* request) const {
80 String query;
81 QueryParameters params;
82 if (!request->decode_query(&query, ¶ms)) {
83 request->error(ERROR_PROTOCOL_ERROR, "Invalid query message");
84 } else if (query.find(SELECT_LOCAL_SCHEMA_CHANGE) != String::npos) {
85 ResultSet local_rs = ResultSet::Builder("system", "local")
86 .column("schema_version", Type::uuid())
87 .row(Row::Builder().uuid(generate_version()).build())
88 .build();
89 request->write(OPCODE_RESULT, local_rs.encode(request->version()));
90 check_counts_->local_count.fetch_add(1);
91 } else if (query.find(SELECT_PEERS_SCHEMA_CHANGE) != String::npos) {
92 ResultSet::Builder peers_builder = ResultSet::Builder("system", "peers")
93 .column("peer", Type::inet())
94 .column("rpc_address", Type::inet())
95 .column("host_id", Type::uuid())
96 .column("schema_version", Type::uuid());
97 Hosts hosts(request->hosts());
98 for (Hosts::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
99 const Host& host(*it);
100 if (host.address == request->address()) {
101 continue;
102 }
103 peers_builder.row(Row::Builder()
104 .inet(host.address)
105 .inet(host.address)
106 .uuid(generate_version()) // Doesn't matter
107 .uuid(generate_version())
108 .build());
109 }
110 ResultSet peers_rs = peers_builder.build();
111 request->write(OPCODE_RESULT, peers_rs.encode(request->version()));
112 check_counts_->peers_count.fetch_add(1);
113 } else {
114 run_next(request);
115 }
116 }
117
118 private:
generate_version() const119 CassUuid generate_version() const {
120 CassUuid version;
121 if (type_ == IMMEDIATE_AGREEMENT) {
122 version = uuid_;
123 } else {
124 uuid_gen_.generate_random(&version);
125 }
126 return version;
127 }
128
129 private:
130 AgreementType type_;
131 CassUuid uuid_;
132 SchemaVersionCheckCounts* check_counts_;
133 mutable UuidGen uuid_gen_;
134 };
135
136 class SchemaChange : public Action {
137 public:
on_run(Request * request) const138 void on_run(Request* request) const {
139 String query;
140 QueryParameters params;
141 if (!request->decode_query(&query, ¶ms)) {
142 request->error(ERROR_PROTOCOL_ERROR, "Invalid query message");
143 } else if (query.find("CREATE TABLE") != String::npos) {
144 request->write(OPCODE_RESULT, encode_schema_change("CREATE", "TABLE"));
145 } else if (query.find("DROP TABLE") != String::npos) {
146 request->write(OPCODE_RESULT, encode_schema_change("DROP", "TABLE"));
147 } else {
148 run_next(request);
149 }
150 }
151
152 private:
encode_schema_change(String change_type,String target) const153 String encode_schema_change(String change_type, String target) const {
154 String body;
155 encode_int32(RESULT_SCHEMA_CHANGE, &body); // Result type
156 encode_string(change_type, &body);
157 encode_string("keyspace", &body);
158 if (target == "TABLE") {
159 encode_string("table", &body);
160 }
161 return body;
162 }
163 };
164 };
165
166 /**
167 * Verify that schema changes wait for schema agreement.
168 */
TEST_F(SchemaAgreementUnitTest,Simple)169 TEST_F(SchemaAgreementUnitTest, Simple) {
170 SchemaVersionCheckCounts check_counts;
171
172 mockssandra::SimpleRequestHandlerBuilder builder;
173 builder.on(OPCODE_QUERY)
174 .execute(new SystemSchemaVersion(SystemSchemaVersion::IMMEDIATE_AGREEMENT, &check_counts))
175 .execute(new SchemaChange())
176 .system_local()
177 .system_peers()
178 .empty_rows_result(1);
179
180 mockssandra::SimpleCluster cluster(builder.build(), 3);
181 ASSERT_EQ(cluster.start_all(), 0);
182
183 Session session;
184 connect(&session);
185
186 add_logging_critera("Found schema agreement in");
187
188 execute(&session, "CREATE TABLE tbl (key text PRIMARY KEY, value text)");
189 EXPECT_EQ(check_counts.local_count.load(), 1);
190 EXPECT_EQ(check_counts.peers_count.load(), 1);
191 EXPECT_EQ(logging_criteria_count(), 1);
192
193 cluster.stop(2);
194 execute(&session, "DROP TABLE tbl");
195 EXPECT_EQ(check_counts.local_count.load(), 2);
196 EXPECT_EQ(check_counts.peers_count.load(), 2);
197 EXPECT_EQ(logging_criteria_count(), 2);
198
199 cluster.stop(3);
200 execute(&session, "CREATE TABLE tbl (key text PRIMARY KEY, value text)");
201 EXPECT_EQ(check_counts.local_count.load(), 3);
202 EXPECT_EQ(check_counts.peers_count.load(), 3);
203 EXPECT_EQ(logging_criteria_count(), 3);
204
205 close(&session);
206 }
207
208 /**
209 * Verify that schema changes will timeout properly while waiting for schema agreement.
210 */
TEST_F(SchemaAgreementUnitTest,Timeout)211 TEST_F(SchemaAgreementUnitTest, Timeout) {
212 SchemaVersionCheckCounts check_counts;
213
214 mockssandra::SimpleRequestHandlerBuilder builder;
215 builder.on(OPCODE_QUERY)
216 .execute(new SystemSchemaVersion(SystemSchemaVersion::NEVER_REACH_AGREEMENT, &check_counts))
217 .execute(new SchemaChange())
218 .system_local()
219 .system_peers()
220 .empty_rows_result(1);
221
222 mockssandra::SimpleCluster cluster(builder.build(), 3);
223 ASSERT_EQ(cluster.start_all(), 0);
224
225 Session session;
226 connect(&session);
227
228 add_logging_critera("No schema agreement on live nodes after ");
229
230 execute(&session, "CREATE TABLE tbl (key text PRIMARY KEY, value text)");
231
232 EXPECT_GT(check_counts.local_count.load(), 1);
233 EXPECT_GT(check_counts.peers_count.load(), 1);
234
235 EXPECT_EQ(logging_criteria_count(), 1);
236
237 close(&session);
238 }
239