1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/spanner/internal/connection_impl.h"
16 #include "google/cloud/spanner/client.h"
17 #include "google/cloud/spanner/internal/spanner_stub.h"
18 #include "google/cloud/spanner/testing/matchers.h"
19 #include "google/cloud/spanner/testing/mock_spanner_stub.h"
20 #include "google/cloud/testing_util/assert_ok.h"
21 #include "google/cloud/testing_util/is_proto_equal.h"
22 #include "google/cloud/testing_util/status_matchers.h"
23 #include "absl/memory/memory.h"
24 #include "absl/types/optional.h"
25 #include <google/protobuf/text_format.h>
26 #include <gmock/gmock.h>
27 #include <array>
28 #include <atomic>
29 #include <chrono>
30 #include <future>
31 #include <string>
32 #include <thread>
33 #include <vector>
34
35 namespace google {
36 namespace cloud {
37 namespace spanner {
38 inline namespace SPANNER_CLIENT_NS {
39 namespace internal {
40 namespace {
41
42 // We compile with -Wextra which enables -Wmissing-field-initializers. This
43 // warning triggers when aggregate initialization is used with too few
44 // arguments. For example
45 //
46 // struct A { int a; int b; int c; }; // 3 fields
47 // A a = {1, 2}; // <-- Warning, missing initializer for A::c.
48 //
49 // To make the test code in this file more readable, we disable this warning
50 // and rely on the guranteed behavior of aggregate initialization.
51 // https://en.cppreference.com/w/cpp/language/aggregate_initialization
52 // Note: "pragma GCC" works for both GCC and clang. MSVC doesn't warn.
53 #if defined(__GNUC__) || defined(__clang__)
54 #pragma GCC diagnostic push
55 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
56 #endif
57
58 using ::google::cloud::spanner_testing::HasSessionAndTransactionId;
59 using ::google::cloud::testing_util::IsProtoEqual;
60 using ::google::cloud::testing_util::StatusIs;
61 using ::google::protobuf::TextFormat;
62 using ::testing::_;
63 using ::testing::AtLeast;
64 using ::testing::ByMove;
65 using ::testing::DoAll;
66 using ::testing::Eq;
67 using ::testing::HasSubstr;
68 using ::testing::InSequence;
69 using ::testing::Property;
70 using ::testing::Return;
71 using ::testing::Sequence;
72 using ::testing::SetArgPointee;
73 using ::testing::StartsWith;
74 using ::testing::UnorderedPointwise;
75
76 namespace spanner_proto = ::google::spanner::v1;
77
78 // Matches a spanner_proto::ReadRequest with the specified `session` and
79 // `transaction_id`.
80 MATCHER_P2(ReadRequestHasSessionAndTransactionId, session, transaction_id,
81 "ReadRequest has expected session and transaction") {
82 return arg.session() == session && arg.transaction().id() == transaction_id;
83 }
84
85 // Matches a spanner_proto::ReadRequest with the specified `session` and
86 // 'begin` set in the TransactionSelector.
87 MATCHER_P(ReadRequestHasSessionAndBeginTransaction, session,
88 "ReadRequest has expected session and transaction") {
89 return arg.session() == session && arg.transaction().has_begin();
90 }
91
92 // Matches a spanner_proto::BatchCreateSessionsRequest with the specified
93 // `database`.
94 MATCHER_P(BatchCreateSessionsRequestHasDatabase, database,
95 "BatchCreateSessionsRequest has expected database") {
96 return arg.database() == database.FullName();
97 }
98
99 // Matches a spanner::Transaction that is bound to a "bad" session"
100 MATCHER(HasBadSession, "bound to a session that's marked bad") {
101 return internal::Visit(
102 arg,
103 [&](internal::SessionHolder& session,
__anonf3d7660c0202(internal::SessionHolder& session, StatusOr<google::spanner::v1::TransactionSelector>&, std::int64_t) 104 StatusOr<google::spanner::v1::TransactionSelector>&, std::int64_t) {
105 if (!session) {
106 *result_listener << "has no session";
107 return false;
108 }
109 if (!session->is_bad()) {
110 *result_listener << "session expected to be bad, but was not";
111 return false;
112 }
113 return true;
114 });
115 }
116
117 // Helper to set the Transaction's ID. Requires selector.ok().
SetTransactionId(Transaction & txn,std::string tid)118 void SetTransactionId(Transaction& txn, std::string tid) {
119 internal::Visit(txn,
120 [&tid](SessionHolder&,
121 StatusOr<spanner_proto::TransactionSelector>& selector,
122 std::int64_t) {
123 selector->set_id(std::move(tid));
124 return 0;
125 });
126 }
127
128 // Helper to mark the Transaction as invalid.
SetTransactionInvalid(Transaction & txn,Status status)129 void SetTransactionInvalid(Transaction& txn, Status status) {
130 internal::Visit(
131 txn, [&status](SessionHolder&,
132 StatusOr<spanner_proto::TransactionSelector>& selector,
133 std::int64_t) {
134 selector = std::move(status);
135 return 0;
136 });
137 }
138
139 // Helper to create a Transaction proto with a specified (or default) id.
MakeTestTransaction(std::string id="1234567890")140 spanner_proto::Transaction MakeTestTransaction(std::string id = "1234567890") {
141 spanner_proto::Transaction txn;
142 txn.set_id(std::move(id));
143 return txn;
144 }
145
146 // Create a response with the given `sessions`
MakeSessionsResponse(std::vector<std::string> sessions)147 spanner_proto::BatchCreateSessionsResponse MakeSessionsResponse(
148 std::vector<std::string> sessions) {
149 spanner_proto::BatchCreateSessionsResponse response;
150 for (auto& session : sessions) {
151 response.add_session()->set_name(std::move(session));
152 }
153 return response;
154 }
155
156 // Create a `Connection` suitable for use in tests that continue retrying
157 // until the retry policy is exhausted - attempting that with the default
158 // policies would take too long (10 minutes).
159 // Other tests can use this method or just call `MakeConnection()` directly.
MakeLimitedRetryConnection(Database const & db,std::shared_ptr<spanner_testing::MockSpannerStub> mock)160 std::shared_ptr<Connection> MakeLimitedRetryConnection(
161 Database const& db,
162 std::shared_ptr<spanner_testing::MockSpannerStub> mock) {
163 return MakeConnection(
164 db, {std::move(mock)},
165 ConnectionOptions{grpc::InsecureChannelCredentials()},
166 SessionPoolOptions{},
167 LimitedErrorCountRetryPolicy(/*maximum_failures=*/2).clone(),
168 ExponentialBackoffPolicy(/*initial_delay=*/std::chrono::microseconds(1),
169 /*maximum_delay=*/std::chrono::microseconds(1),
170 /*scaling=*/2.0)
171 .clone());
172 }
173
174 class MockGrpcReader
175 : public ::grpc::ClientReaderInterface<spanner_proto::PartialResultSet> {
176 public:
177 MOCK_METHOD1(Read, bool(spanner_proto::PartialResultSet*));
178 MOCK_METHOD1(NextMessageSize, bool(std::uint32_t*));
179 MOCK_METHOD0(Finish, grpc::Status());
180 MOCK_METHOD0(WaitForInitialMetadata, void());
181 };
182
TEST(ConnectionImplTest,ReadGetSessionFailure)183 TEST(ConnectionImplTest, ReadGetSessionFailure) {
184 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
185
186 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
187 auto conn = MakeConnection(
188 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
189 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
190 .WillOnce(
191 [&db](grpc::ClientContext&,
192 spanner_proto::BatchCreateSessionsRequest const& request) {
193 EXPECT_EQ(db.FullName(), request.database());
194 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
195 });
196
197 auto rows =
198 conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
199 "table",
200 KeySet::All(),
201 {"column1"}});
202 for (auto& row : rows) {
203 EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
204 EXPECT_THAT(row.status().message(), HasSubstr("uh-oh in GetSession"));
205 }
206 }
207
TEST(ConnectionImplTest,ReadStreamingReadFailure)208 TEST(ConnectionImplTest, ReadStreamingReadFailure) {
209 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
210
211 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
212 auto conn = MakeConnection(
213 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
214
215 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
216 .WillOnce(
217 [&db](grpc::ClientContext&,
218 spanner_proto::BatchCreateSessionsRequest const& request) {
219 EXPECT_EQ(db.FullName(), request.database());
220 return MakeSessionsResponse({"test-session-name"});
221 });
222
223 auto grpc_reader = absl::make_unique<MockGrpcReader>();
224 EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
225 grpc::Status finish_status(grpc::StatusCode::PERMISSION_DENIED,
226 "uh-oh in GrpcReader::Finish");
227 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
228 EXPECT_CALL(*mock, StreamingRead(_, _))
229 .WillOnce(Return(ByMove(std::move(grpc_reader))));
230
231 auto rows =
232 conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
233 "table",
234 KeySet::All(),
235 {"column1"}});
236 for (auto& row : rows) {
237 EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
238 EXPECT_THAT(row.status().message(),
239 HasSubstr("uh-oh in GrpcReader::Finish"));
240 }
241 }
242
TEST(ConnectionImplTest,ReadSuccess)243 TEST(ConnectionImplTest, ReadSuccess) {
244 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
245
246 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
247 auto conn = MakeConnection(
248 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
249 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
250 .WillOnce(
251 [&db](grpc::ClientContext&,
252 spanner_proto::BatchCreateSessionsRequest const& request) {
253 EXPECT_EQ(db.FullName(), request.database());
254 return MakeSessionsResponse({"test-session-name"});
255 });
256
257 auto reader1 = absl::make_unique<MockGrpcReader>();
258 EXPECT_CALL(*reader1, Read(_)).WillOnce(Return(false));
259 EXPECT_CALL(*reader1, Finish())
260 .WillOnce(
261 Return(grpc::Status(grpc::StatusCode::UNAVAILABLE, "try-again")));
262
263 auto reader2 = absl::make_unique<MockGrpcReader>();
264 auto constexpr kText = R"pb(
265 metadata: {
266 row_type: {
267 fields: {
268 name: "UserId",
269 type: { code: INT64 }
270 }
271 fields: {
272 name: "UserName",
273 type: { code: STRING }
274 }
275 }
276 }
277 values: { string_value: "12" }
278 values: { string_value: "Steve" }
279 values: { string_value: "42" }
280 values: { string_value: "Ann" }
281 )pb";
282 spanner_proto::PartialResultSet response;
283 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
284 EXPECT_CALL(*reader2, Read(_))
285 .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
286 .WillOnce(Return(false));
287 EXPECT_CALL(*reader2, Finish()).WillOnce(Return(grpc::Status()));
288 EXPECT_CALL(*mock, StreamingRead(_, _))
289 .WillOnce(Return(ByMove(std::move(reader1))))
290 .WillOnce(Return(ByMove(std::move(reader2))));
291
292 auto rows =
293 conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
294 "table",
295 KeySet::All(),
296 {"UserId", "UserName"}});
297 using RowType = std::tuple<std::int64_t, std::string>;
298 auto expected = std::vector<RowType>{
299 RowType(12, "Steve"),
300 RowType(42, "Ann"),
301 };
302 int row_number = 0;
303 for (auto& row : StreamOf<RowType>(rows)) {
304 EXPECT_STATUS_OK(row);
305 EXPECT_EQ(*row, expected[row_number]);
306 ++row_number;
307 }
308 EXPECT_EQ(row_number, expected.size());
309 }
310
TEST(ConnectionImplTest,ReadPermanentFailure)311 TEST(ConnectionImplTest, ReadPermanentFailure) {
312 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
313
314 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
315 auto conn = MakeLimitedRetryConnection(db, mock);
316 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
317 .WillOnce(
318 [&db](grpc::ClientContext&,
319 spanner_proto::BatchCreateSessionsRequest const& request) {
320 EXPECT_EQ(db.FullName(), request.database());
321 return MakeSessionsResponse({"test-session-name"});
322 });
323
324 auto reader1 = absl::make_unique<MockGrpcReader>();
325 EXPECT_CALL(*reader1, Read(_)).WillOnce(Return(false));
326 EXPECT_CALL(*reader1, Finish())
327 .WillOnce(
328 Return(grpc::Status(grpc::StatusCode::PERMISSION_DENIED, "uh-oh")));
329 EXPECT_CALL(*mock, StreamingRead(_, _))
330 .WillOnce(Return(ByMove(std::move(reader1))));
331
332 auto rows =
333 conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
334 "table",
335 KeySet::All(),
336 {"UserId", "UserName"}});
337 for (auto& row : rows) {
338 EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
339 EXPECT_THAT(row.status().message(), HasSubstr("uh-oh"));
340 }
341 }
342
TEST(ConnectionImplTest,ReadTooManyTransientFailures)343 TEST(ConnectionImplTest, ReadTooManyTransientFailures) {
344 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
345
346 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
347 auto conn = MakeLimitedRetryConnection(db, mock);
348 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
349 .WillOnce(
350 [&db](grpc::ClientContext&,
351 spanner_proto::BatchCreateSessionsRequest const& request) {
352 EXPECT_EQ(db.FullName(), request.database());
353 return MakeSessionsResponse({"test-session-name"});
354 });
355
356 EXPECT_CALL(*mock, StreamingRead(_, _))
357 .Times(AtLeast(2))
358 .WillRepeatedly(
359 [](grpc::ClientContext&, spanner_proto::ReadRequest const&) {
360 auto reader = absl::make_unique<MockGrpcReader>();
361 EXPECT_CALL(*reader, Read(_)).WillOnce(Return(false));
362 EXPECT_CALL(*reader, Finish())
363 .WillOnce(Return(
364 grpc::Status(grpc::StatusCode::UNAVAILABLE, "try-again")));
365 return reader;
366 });
367
368 auto rows =
369 conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
370 "table",
371 KeySet::All(),
372 {"UserId", "UserName"}});
373 for (auto& row : rows) {
374 EXPECT_EQ(StatusCode::kUnavailable, row.status().code());
375 EXPECT_THAT(row.status().message(), HasSubstr("try-again"));
376 }
377 }
378
379 /// @test Verify implicit "begin transaction" in Read() works.
TEST(ConnectionImplTest,ReadImplicitBeginTransaction)380 TEST(ConnectionImplTest, ReadImplicitBeginTransaction) {
381 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
382
383 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
384 auto conn = MakeConnection(
385 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
386 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
387 .WillOnce(
388 [&db](grpc::ClientContext&,
389 spanner_proto::BatchCreateSessionsRequest const& request) {
390 EXPECT_EQ(db.FullName(), request.database());
391 return MakeSessionsResponse({"test-session-name"});
392 });
393
394 auto grpc_reader = absl::make_unique<MockGrpcReader>();
395 auto constexpr kText = R"pb(metadata: { transaction: { id: "ABCDEF00" } })pb";
396 spanner_proto::PartialResultSet response;
397 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
398 EXPECT_CALL(*grpc_reader, Read(_))
399 .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
400 .WillOnce(Return(false));
401 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
402 EXPECT_CALL(*mock, StreamingRead(_, _))
403 .WillOnce(Return(ByMove(std::move(grpc_reader))));
404
405 Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
406 auto rows = conn->Read({txn, "table", KeySet::All(), {"UserId", "UserName"}});
407 for (auto& row : rows) {
408 EXPECT_STATUS_OK(row);
409 }
410 EXPECT_THAT(txn, HasSessionAndTransactionId("test-session-name", "ABCDEF00"));
411 }
412
TEST(ConnectionImplTest,ReadImplicitBeginTransactionPermanentFailure)413 TEST(ConnectionImplTest, ReadImplicitBeginTransactionPermanentFailure) {
414 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
415
416 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
417 auto conn = MakeLimitedRetryConnection(db, mock);
418
419 auto make_reader = [] {
420 grpc::Status grpc_status(grpc::StatusCode::PERMISSION_DENIED, "uh-oh");
421 auto reader = absl::make_unique<MockGrpcReader>();
422 EXPECT_CALL(*reader, Read(_)).WillOnce(Return(false));
423 EXPECT_CALL(*reader, Finish()).WillOnce(Return(grpc_status));
424 return reader;
425 };
426 auto reader1 = make_reader();
427 auto reader2 = make_reader();
428 // n.b. these calls are explicitly sequenced because using the scoped
429 // `InSequence` object causes gmock to get confused by the reader calls.
430 Sequence s;
431 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
432 .InSequence(s)
433 .WillOnce(
434 [&db](grpc::ClientContext&,
435 spanner_proto::BatchCreateSessionsRequest const& request) {
436 EXPECT_EQ(db.FullName(), request.database());
437 return MakeSessionsResponse({"test-session-name"});
438 });
439 EXPECT_CALL(*mock, StreamingRead(_, ReadRequestHasSessionAndBeginTransaction(
440 "test-session-name")))
441 .InSequence(s)
442 .WillOnce(Return(ByMove(std::move(reader1))));
443 EXPECT_CALL(*mock, BeginTransaction(_, _))
444 .InSequence(s)
445 .WillOnce(Return(MakeTestTransaction("FEDCBA98")));
446 EXPECT_CALL(*mock, StreamingRead(_, ReadRequestHasSessionAndTransactionId(
447 "test-session-name", "FEDCBA98")))
448 .InSequence(s)
449 .WillOnce(Return(ByMove(std::move(reader2))));
450
451 Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
452 auto rows = conn->Read({txn, "table", KeySet::All(), {"UserId", "UserName"}});
453 for (auto& row : rows) {
454 EXPECT_THAT(row,
455 StatusIs(StatusCode::kPermissionDenied, HasSubstr("uh-oh")));
456 }
457 }
458
TEST(ConnectionImplTest,ExecuteQueryGetSessionFailure)459 TEST(ConnectionImplTest, ExecuteQueryGetSessionFailure) {
460 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
461 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
462 auto conn = MakeConnection(
463 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
464 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
465 .WillOnce(
466 [&db](grpc::ClientContext&,
467 spanner_proto::BatchCreateSessionsRequest const& request) {
468 EXPECT_EQ(db.FullName(), request.database());
469 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
470 });
471
472 auto rows = conn->ExecuteQuery(
473 {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
474 SqlStatement("select * from table")});
475 for (auto& row : rows) {
476 EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
477 EXPECT_THAT(row.status().message(), HasSubstr("uh-oh in GetSession"));
478 }
479 }
480
TEST(ConnectionImplTest,ExecuteQueryStreamingReadFailure)481 TEST(ConnectionImplTest, ExecuteQueryStreamingReadFailure) {
482 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
483
484 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
485 auto conn = MakeConnection(
486 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
487 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
488 .WillOnce(
489 [&db](grpc::ClientContext&,
490 spanner_proto::BatchCreateSessionsRequest const& request) {
491 EXPECT_EQ(db.FullName(), request.database());
492 return MakeSessionsResponse({"test-session-name"});
493 });
494
495 auto grpc_reader = absl::make_unique<MockGrpcReader>();
496 EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
497 grpc::Status finish_status(grpc::StatusCode::PERMISSION_DENIED,
498 "uh-oh in GrpcReader::Finish");
499 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
500 EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
501 .WillOnce(Return(ByMove(std::move(grpc_reader))));
502
503 auto rows = conn->ExecuteQuery(
504 {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
505 SqlStatement("select * from table")});
506 for (auto& row : rows) {
507 EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
508 EXPECT_THAT(row.status().message(),
509 HasSubstr("uh-oh in GrpcReader::Finish"));
510 }
511 }
512
TEST(ConnectionImplTest,ExecuteQueryReadSuccess)513 TEST(ConnectionImplTest, ExecuteQueryReadSuccess) {
514 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
515
516 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
517 auto conn = MakeConnection(
518 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
519 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
520 .WillOnce(
521 [&db](grpc::ClientContext&,
522 spanner_proto::BatchCreateSessionsRequest const& request) {
523 EXPECT_EQ(db.FullName(), request.database());
524 return MakeSessionsResponse({"test-session-name"});
525 });
526
527 auto grpc_reader = absl::make_unique<MockGrpcReader>();
528 auto constexpr kText = R"pb(
529 metadata: {
530 row_type: {
531 fields: {
532 name: "UserId",
533 type: { code: INT64 }
534 }
535 fields: {
536 name: "UserName",
537 type: { code: STRING }
538 }
539 }
540 }
541 values: { string_value: "12" }
542 values: { string_value: "Steve" }
543 values: { string_value: "42" }
544 values: { string_value: "Ann" }
545 )pb";
546 spanner_proto::PartialResultSet response;
547 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
548 EXPECT_CALL(*grpc_reader, Read(_))
549 .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
550 .WillOnce(Return(false));
551 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
552 EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
553 .WillOnce(Return(ByMove(std::move(grpc_reader))));
554
555 auto rows = conn->ExecuteQuery(
556 {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
557 SqlStatement("select * from table")});
558 using RowType = std::tuple<std::int64_t, std::string>;
559 auto expected = std::vector<RowType>{
560 RowType(12, "Steve"),
561 RowType(42, "Ann"),
562 };
563 int row_number = 0;
564 for (auto& row : StreamOf<RowType>(rows)) {
565 EXPECT_STATUS_OK(row);
566 EXPECT_EQ(*row, expected[row_number]);
567 ++row_number;
568 }
569 EXPECT_EQ(row_number, expected.size());
570 }
571
572 /// @test Verify implicit "begin transaction" in ExecuteQuery() works.
TEST(ConnectionImplTest,ExecuteQueryImplicitBeginTransaction)573 TEST(ConnectionImplTest, ExecuteQueryImplicitBeginTransaction) {
574 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
575
576 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
577 auto conn = MakeConnection(
578 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
579 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
580 .WillOnce(
581 [&db](grpc::ClientContext&,
582 spanner_proto::BatchCreateSessionsRequest const& request) {
583 EXPECT_EQ(db.FullName(), request.database());
584 return MakeSessionsResponse({"test-session-name"});
585 });
586
587 auto grpc_reader = absl::make_unique<MockGrpcReader>();
588 auto constexpr kText = R"pb(metadata: { transaction: { id: "00FEDCBA" } })pb";
589 spanner_proto::PartialResultSet response;
590 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
591 EXPECT_CALL(*grpc_reader, Read(_))
592 .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
593 .WillOnce(Return(false));
594 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
595 EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
596 .WillOnce(Return(ByMove(std::move(grpc_reader))));
597
598 Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
599 auto rows = conn->ExecuteQuery({txn, SqlStatement("select * from table")});
600 for (auto& row : rows) {
601 EXPECT_STATUS_OK(row);
602 }
603 EXPECT_THAT(txn, HasSessionAndTransactionId("test-session-name", "00FEDCBA"));
604 }
605
TEST(ConnectionImplTest,QueryOptions)606 TEST(ConnectionImplTest, QueryOptions) {
607 auto constexpr kQueryOptionsProp =
608 &spanner_proto::ExecuteSqlRequest::query_options;
609 std::vector<absl::optional<std::string>> const optimizer_versions = {
610 {}, "", "some-version"};
611
612 for (auto const& version : optimizer_versions) {
613 spanner_proto::ExecuteSqlRequest::QueryOptions qo;
614 if (version) qo.set_optimizer_version(*version);
615 auto m = Property(kQueryOptionsProp, IsProtoEqual(qo));
616 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
617 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
618 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
619
620 auto txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
621 auto query_options = QueryOptions().set_optimizer_version(version);
622 auto params = Connection::SqlParams{txn, SqlStatement{}, query_options};
623 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
624 auto conn = MakeConnection(
625 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
626
627 // We wrap MockGrpcReader in NiceMock, because we don't really care how
628 // it's called in this test (aside from needing to return a transaction in
629 // the first call), and we want to minimize GMock's "uninteresting mock
630 // function call" warnings.
631 using ::testing::NiceMock;
632 auto grpc_reader = absl::make_unique<NiceMock<MockGrpcReader>>();
633 auto constexpr kResponseText =
634 R"pb(metadata: { transaction: { id: "ABCDEF00" } })pb";
635 spanner_proto::PartialResultSet response;
636 ASSERT_TRUE(TextFormat::ParseFromString(kResponseText, &response));
637 EXPECT_CALL(*grpc_reader, Read(_))
638 .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)));
639
640 // Calls the 5 Connection::* methods that take SqlParams and ensures that
641 // the protos being sent contain the expected options.
642 EXPECT_CALL(*mock, ExecuteStreamingSql(_, m))
643 .WillOnce(Return(ByMove(std::move(grpc_reader))))
644 .WillOnce(
645 Return(ByMove(absl::make_unique<NiceMock<MockGrpcReader>>())));
646 (void)conn->ExecuteQuery(params);
647 (void)conn->ProfileQuery(params);
648
649 EXPECT_CALL(*mock, ExecuteSql(_, m)).Times(3);
650 (void)conn->ExecuteDml(params);
651 (void)conn->ProfileDml(params);
652 (void)conn->AnalyzeSql(params);
653 }
654 }
655
TEST(ConnectionImplTest,ExecuteDmlGetSessionFailure)656 TEST(ConnectionImplTest, ExecuteDmlGetSessionFailure) {
657 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
658 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
659 auto conn = MakeConnection(
660 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
661 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
662 .WillOnce(
663 [&db](grpc::ClientContext&,
664 spanner_proto::BatchCreateSessionsRequest const& request) {
665 EXPECT_EQ(db.FullName(), request.database());
666 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
667 });
668
669 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
670 auto result = conn->ExecuteDml({txn, SqlStatement("delete * from table")});
671
672 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
673 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in GetSession"));
674 }
675
TEST(ConnectionImplTest,ExecuteDmlDeleteSuccess)676 TEST(ConnectionImplTest, ExecuteDmlDeleteSuccess) {
677 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
678 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
679 auto conn = MakeConnection(
680 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
681
682 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
683 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
684
685 auto constexpr kText = R"pb(
686 metadata: { transaction: { id: "1234567890" } }
687 stats: { row_count_exact: 42 }
688 )pb";
689 spanner_proto::ResultSet response;
690 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
691
692 EXPECT_CALL(*mock, ExecuteSql(_, _))
693 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
694 .WillOnce(Return(response));
695 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
696 auto result = conn->ExecuteDml({txn, SqlStatement("delete * from table")});
697
698 ASSERT_STATUS_OK(result);
699 EXPECT_EQ(result->RowsModified(), 42);
700 }
701
TEST(ConnectionImplTest,ExecuteDmlDeletePermanentFailure)702 TEST(ConnectionImplTest, ExecuteDmlDeletePermanentFailure) {
703 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
704 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
705 auto conn = MakeConnection(
706 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
707
708 {
709 InSequence seq;
710 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
711 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
712 Status status(StatusCode::kPermissionDenied, "uh-oh in ExecuteDml");
713 EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
714 EXPECT_CALL(*mock, BeginTransaction(_, _))
715 .WillOnce(Return(MakeTestTransaction()));
716 EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
717 }
718
719 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
720 auto result = conn->ExecuteDml({txn, SqlStatement("delete * from table")});
721
722 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
723 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in ExecuteDml"));
724 }
725
TEST(ConnectionImplTest,ExecuteDmlDeleteTooManyTransientFailures)726 TEST(ConnectionImplTest, ExecuteDmlDeleteTooManyTransientFailures) {
727 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
728 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
729 auto conn = MakeLimitedRetryConnection(db, mock);
730
731 {
732 InSequence seq;
733 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
734 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
735 Status status(StatusCode::kUnavailable, "try-again in ExecuteDml");
736 EXPECT_CALL(*mock, ExecuteSql(_, _))
737 .Times(AtLeast(2))
738 .WillRepeatedly(Return(status));
739 EXPECT_CALL(*mock, BeginTransaction(_, _))
740 .WillOnce(Return(MakeTestTransaction()));
741 EXPECT_CALL(*mock, ExecuteSql(_, _))
742 .Times(AtLeast(2))
743 .WillRepeatedly(Return(status));
744 }
745
746 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
747 auto result = conn->ExecuteDml({txn, SqlStatement("delete * from table")});
748
749 EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
750 EXPECT_THAT(result.status().message(), HasSubstr("try-again in ExecuteDml"));
751 }
752
753 // Tests that a Transaction that fails to begin does not successfully commit.
754 // That would violate atomicity since the first DML statement did not execute.
TEST(ConnectionImplTest,ExecuteDmlTransactionAtomicity)755 TEST(ConnectionImplTest, ExecuteDmlTransactionAtomicity) {
756 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
757 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
758 auto conn = MakeConnection(
759 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
760
761 Status op_status(StatusCode::kInvalidArgument, "ExecuteSql status");
762 Status begin_status(StatusCode::kInvalidArgument, "BeginTransaction status");
763 {
764 InSequence seq;
765 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
766 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
767
768 // The first `ExecuteDml` call tries to implicitly begin the transaction
769 // via `ExecuteSql`, and then explicitly via `BeginTransaction`. Both fail,
770 // and we should receive no further RPC calls - since the transaction is
771 // not valid the client fails any subsequent operations itself.
772 EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(op_status));
773 EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(begin_status));
774 }
775
776 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
777 // The first operation fails with the status of the operation's RPC
778 // (`ExecuteSql` in this case).
779 EXPECT_THAT(conn->ExecuteDml({txn, SqlStatement("some statement")}),
780 StatusIs(op_status.code(), HasSubstr(op_status.message())));
781 // Subsequent operations fail with the status of `BeginTransaction`.
782 EXPECT_THAT(conn->ExecuteDml({txn, SqlStatement("another statement")}),
783 StatusIs(begin_status.code(), HasSubstr(begin_status.message())));
784 EXPECT_THAT(conn->Commit({txn, {}}),
785 StatusIs(begin_status.code(), HasSubstr(begin_status.message())));
786 }
787
TEST(ConnectionImplTest,ExecuteDmlTransactionMissing)788 TEST(ConnectionImplTest, ExecuteDmlTransactionMissing) {
789 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
790 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
791 auto conn = MakeConnection(
792 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
793
794 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
795 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
796
797 // Return an otherwise valid response that does not contain a transaction.
798 spanner_proto::ResultSet response;
799 ASSERT_TRUE(TextFormat::ParseFromString("metadata: {}", &response));
800 EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(response));
801
802 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
803 EXPECT_THAT(
804 conn->ExecuteDml({txn, SqlStatement("select 1")}),
805 StatusIs(StatusCode::kInternal,
806 HasSubstr(
807 "Begin transaction requested but no transaction returned")));
808 }
809
TEST(ConnectionImplTest,ProfileQuerySuccess)810 TEST(ConnectionImplTest, ProfileQuerySuccess) {
811 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
812 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
813 auto conn = MakeConnection(
814 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
815 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
816 .WillOnce(
817 [&db](grpc::ClientContext&,
818 spanner_proto::BatchCreateSessionsRequest const& request) {
819 EXPECT_EQ(db.FullName(), request.database());
820 return MakeSessionsResponse({"test-session-name"});
821 });
822
823 auto grpc_reader = absl::make_unique<MockGrpcReader>();
824 auto constexpr kText = R"pb(
825 metadata: {
826 row_type: {
827 fields: {
828 name: "UserId",
829 type: { code: INT64 }
830 }
831 fields: {
832 name: "UserName",
833 type: { code: STRING }
834 }
835 }
836 }
837 values: { string_value: "12" }
838 values: { string_value: "Steve" }
839 values: { string_value: "42" }
840 values: { string_value: "Ann" }
841 stats: {
842 query_plan { plan_nodes: { index: 42 } }
843 query_stats {
844 fields {
845 key: "elapsed_time"
846 value { string_value: "42 secs" }
847 }
848 }
849 }
850 )pb";
851 spanner_proto::PartialResultSet response;
852 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
853 EXPECT_CALL(*grpc_reader, Read(_))
854 .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
855 .WillOnce(Return(false));
856 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
857 EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
858 .WillOnce(Return(ByMove(std::move(grpc_reader))));
859
860 auto result = conn->ProfileQuery(
861 {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
862 SqlStatement("select * from table")});
863 using RowType = std::tuple<std::int64_t, std::string>;
864 auto expected = std::vector<RowType>{
865 RowType(12, "Steve"),
866 RowType(42, "Ann"),
867 };
868 int row_number = 0;
869 for (auto& row : StreamOf<RowType>(result)) {
870 EXPECT_STATUS_OK(row);
871 EXPECT_EQ(*row, expected[row_number]);
872 ++row_number;
873 }
874 EXPECT_EQ(row_number, expected.size());
875
876 auto constexpr kTextExpectedPlan = R"pb(
877 plan_nodes: { index: 42 }
878 )pb";
879 google::spanner::v1::QueryPlan expected_plan;
880 ASSERT_TRUE(TextFormat::ParseFromString(kTextExpectedPlan, &expected_plan));
881
882 auto plan = result.ExecutionPlan();
883 ASSERT_TRUE(plan);
884 EXPECT_THAT(*plan, IsProtoEqual(expected_plan));
885
886 std::vector<std::pair<const std::string, std::string>> expected_stats;
887 expected_stats.emplace_back("elapsed_time", "42 secs");
888 auto execution_stats = result.ExecutionStats();
889 ASSERT_TRUE(execution_stats);
890 EXPECT_THAT(*execution_stats, UnorderedPointwise(Eq(), expected_stats));
891 }
892
TEST(ConnectionImplTest,ProfileQueryGetSessionFailure)893 TEST(ConnectionImplTest, ProfileQueryGetSessionFailure) {
894 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
895 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
896 auto conn = MakeConnection(
897 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
898 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
899 .WillOnce(
900 [&db](grpc::ClientContext&,
901 spanner_proto::BatchCreateSessionsRequest const& request) {
902 EXPECT_EQ(db.FullName(), request.database());
903 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
904 });
905
906 auto result = conn->ProfileQuery(
907 {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
908 SqlStatement("select * from table")});
909 for (auto& row : result) {
910 EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
911 EXPECT_THAT(row.status().message(), HasSubstr("uh-oh in GetSession"));
912 }
913 }
914
TEST(ConnectionImplTest,ProfileQueryStreamingReadFailure)915 TEST(ConnectionImplTest, ProfileQueryStreamingReadFailure) {
916 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
917
918 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
919 auto conn = MakeConnection(
920 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
921 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
922 .WillOnce(
923 [&db](grpc::ClientContext&,
924 spanner_proto::BatchCreateSessionsRequest const& request) {
925 EXPECT_EQ(db.FullName(), request.database());
926 return MakeSessionsResponse({"test-session-name"});
927 });
928
929 auto grpc_reader = absl::make_unique<MockGrpcReader>();
930 EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
931 grpc::Status finish_status(grpc::StatusCode::PERMISSION_DENIED,
932 "uh-oh in GrpcReader::Finish");
933 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
934 EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
935 .WillOnce(Return(ByMove(std::move(grpc_reader))));
936
937 auto result = conn->ProfileQuery(
938 {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
939 SqlStatement("select * from table")});
940 for (auto& row : result) {
941 EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
942 EXPECT_THAT(row.status().message(),
943 HasSubstr("uh-oh in GrpcReader::Finish"));
944 }
945 }
946
TEST(ConnectionImplTest,ProfileDmlGetSessionFailure)947 TEST(ConnectionImplTest, ProfileDmlGetSessionFailure) {
948 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
949 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
950 auto conn = MakeConnection(
951 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
952 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
953 .WillOnce(
954 [&db](grpc::ClientContext&,
955 spanner_proto::BatchCreateSessionsRequest const& request) {
956 EXPECT_EQ(db.FullName(), request.database());
957 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
958 });
959
960 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
961 auto result = conn->ProfileDml({txn, SqlStatement("delete * from table")});
962
963 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
964 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in GetSession"));
965 }
966
TEST(ConnectionImplTest,ProfileDmlDeleteSuccess)967 TEST(ConnectionImplTest, ProfileDmlDeleteSuccess) {
968 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
969 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
970 auto conn = MakeConnection(
971 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
972
973 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
974 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
975
976 auto constexpr kText = R"pb(
977 metadata: { transaction: { id: "1234567890" } }
978 stats: {
979 row_count_exact: 42
980 query_plan { plan_nodes: { index: 42 } }
981 query_stats {
982 fields {
983 key: "elapsed_time"
984 value { string_value: "42 secs" }
985 }
986 }
987 }
988 )pb";
989 spanner_proto::ResultSet response;
990 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
991
992 EXPECT_CALL(*mock, ExecuteSql(_, _))
993 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
994 .WillOnce(Return(response));
995 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
996 auto result = conn->ProfileDml({txn, SqlStatement("delete * from table")});
997
998 ASSERT_STATUS_OK(result);
999 EXPECT_EQ(result->RowsModified(), 42);
1000
1001 auto constexpr kTextExpectedPlan = R"pb(
1002 plan_nodes: { index: 42 }
1003 )pb";
1004 google::spanner::v1::QueryPlan expected_plan;
1005 ASSERT_TRUE(TextFormat::ParseFromString(kTextExpectedPlan, &expected_plan));
1006
1007 auto plan = result->ExecutionPlan();
1008 ASSERT_TRUE(plan);
1009 EXPECT_THAT(*plan, IsProtoEqual(expected_plan));
1010
1011 std::vector<std::pair<const std::string, std::string>> expected_stats;
1012 expected_stats.emplace_back("elapsed_time", "42 secs");
1013 auto execution_stats = result->ExecutionStats();
1014 ASSERT_TRUE(execution_stats);
1015 EXPECT_THAT(*execution_stats, UnorderedPointwise(Eq(), expected_stats));
1016 }
1017
TEST(ConnectionImplTest,ProfileDmlDeletePermanentFailure)1018 TEST(ConnectionImplTest, ProfileDmlDeletePermanentFailure) {
1019 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1020 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1021 auto conn = MakeConnection(
1022 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1023
1024 {
1025 InSequence seq;
1026 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1027 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1028 Status status(StatusCode::kPermissionDenied, "uh-oh in ExecuteDml");
1029 EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
1030 EXPECT_CALL(*mock, BeginTransaction(_, _))
1031 .WillOnce(Return(MakeTestTransaction()));
1032 EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
1033 }
1034
1035 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1036 auto result = conn->ProfileDml({txn, SqlStatement("delete * from table")});
1037
1038 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1039 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in ExecuteDml"));
1040 }
1041
TEST(ConnectionImplTest,ProfileDmlDeleteTooManyTransientFailures)1042 TEST(ConnectionImplTest, ProfileDmlDeleteTooManyTransientFailures) {
1043 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1044 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1045 auto conn = MakeLimitedRetryConnection(db, mock);
1046
1047 {
1048 InSequence seq;
1049 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1050 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1051 Status status(StatusCode::kUnavailable, "try-again in ExecuteDml");
1052 EXPECT_CALL(*mock, ExecuteSql(_, _))
1053 .Times(AtLeast(2))
1054 .WillRepeatedly(Return(status));
1055 EXPECT_CALL(*mock, BeginTransaction(_, _))
1056 .WillOnce(Return(MakeTestTransaction()));
1057 EXPECT_CALL(*mock, ExecuteSql(_, _))
1058 .Times(AtLeast(2))
1059 .WillRepeatedly(Return(status));
1060 }
1061
1062 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1063 auto result = conn->ProfileDml({txn, SqlStatement("delete * from table")});
1064
1065 EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1066 EXPECT_THAT(result.status().message(), HasSubstr("try-again in ExecuteDml"));
1067 }
1068
TEST(ConnectionImplTest,AnalyzeSqlSuccess)1069 TEST(ConnectionImplTest, AnalyzeSqlSuccess) {
1070 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1071
1072 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1073 auto conn = MakeConnection(
1074 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1075 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1076 .WillOnce(
1077 [&db](grpc::ClientContext&,
1078 spanner_proto::BatchCreateSessionsRequest const& request) {
1079 EXPECT_EQ(db.FullName(), request.database());
1080 return MakeSessionsResponse({"test-session-name"});
1081 });
1082
1083 auto grpc_reader = absl::make_unique<MockGrpcReader>();
1084 auto constexpr kText = R"pb(
1085 metadata: {}
1086 stats: { query_plan { plan_nodes: { index: 42 } } }
1087 )pb";
1088 spanner_proto::ResultSet response;
1089 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
1090 EXPECT_CALL(*mock, ExecuteSql(_, _))
1091 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1092 .WillOnce(Return(ByMove(std::move(response))));
1093
1094 auto result = conn->AnalyzeSql(
1095 {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
1096 SqlStatement("select * from table")});
1097
1098 auto constexpr kTextExpectedPlan = R"pb(
1099 plan_nodes: { index: 42 }
1100 )pb";
1101 google::spanner::v1::QueryPlan expected_plan;
1102 ASSERT_TRUE(TextFormat::ParseFromString(kTextExpectedPlan, &expected_plan));
1103
1104 ASSERT_STATUS_OK(result);
1105 EXPECT_THAT(*result, IsProtoEqual(expected_plan));
1106 }
1107
TEST(ConnectionImplTest,AnalyzeSqlGetSessionFailure)1108 TEST(ConnectionImplTest, AnalyzeSqlGetSessionFailure) {
1109 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1110 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1111 auto conn = MakeConnection(
1112 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1113 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1114 .WillOnce(
1115 [&db](grpc::ClientContext&,
1116 spanner_proto::BatchCreateSessionsRequest const& request) {
1117 EXPECT_EQ(db.FullName(), request.database());
1118 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
1119 });
1120
1121 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1122 auto result = conn->AnalyzeSql({txn, SqlStatement("delete * from table")});
1123
1124 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1125 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in GetSession"));
1126 }
1127
TEST(ConnectionImplTest,AnalyzeSqlDeletePermanentFailure)1128 TEST(ConnectionImplTest, AnalyzeSqlDeletePermanentFailure) {
1129 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1130 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1131 auto conn = MakeConnection(
1132 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1133
1134 {
1135 InSequence seq;
1136 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1137 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1138 Status status(StatusCode::kPermissionDenied, "uh-oh in ExecuteDml");
1139 EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
1140 EXPECT_CALL(*mock, BeginTransaction(_, _))
1141 .WillOnce(Return(MakeTestTransaction()));
1142 EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
1143 }
1144
1145 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1146 auto result = conn->AnalyzeSql({txn, SqlStatement("delete * from table")});
1147
1148 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1149 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in ExecuteDml"));
1150 }
1151
TEST(ConnectionImplTest,AnalyzeSqlDeleteTooManyTransientFailures)1152 TEST(ConnectionImplTest, AnalyzeSqlDeleteTooManyTransientFailures) {
1153 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1154 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1155 auto conn = MakeLimitedRetryConnection(db, mock);
1156
1157 {
1158 InSequence seq;
1159 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1160 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1161 Status status(StatusCode::kUnavailable, "try-again in ExecuteDml");
1162 EXPECT_CALL(*mock, ExecuteSql(_, _))
1163 .Times(AtLeast(2))
1164 .WillRepeatedly(Return(status));
1165 EXPECT_CALL(*mock, BeginTransaction(_, _))
1166 .WillOnce(Return(MakeTestTransaction()));
1167 EXPECT_CALL(*mock, ExecuteSql(_, _))
1168 .Times(AtLeast(2))
1169 .WillRepeatedly(Return(status));
1170 }
1171
1172 Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1173 auto result = conn->AnalyzeSql({txn, SqlStatement("delete * from table")});
1174
1175 EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1176 EXPECT_THAT(result.status().message(), HasSubstr("try-again in ExecuteDml"));
1177 }
1178
TEST(ConnectionImplTest,ExecuteBatchDmlSuccess)1179 TEST(ConnectionImplTest, ExecuteBatchDmlSuccess) {
1180 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1181 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1182
1183 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1184 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1185
1186 auto constexpr kText = R"pb(
1187 result_sets: {
1188 metadata: { transaction: { id: "1234567890" } }
1189 stats: { row_count_exact: 0 }
1190 }
1191 result_sets: { stats: { row_count_exact: 1 } }
1192 result_sets: { stats: { row_count_exact: 2 } }
1193 )pb";
1194 spanner_proto::ExecuteBatchDmlResponse response;
1195 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
1196 EXPECT_CALL(*mock, ExecuteBatchDml(_, _))
1197 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1198 .WillOnce(Return(response));
1199
1200 auto request = {
1201 SqlStatement("update ..."),
1202 SqlStatement("update ..."),
1203 SqlStatement("update ..."),
1204 };
1205
1206 auto conn = MakeConnection(
1207 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1208 auto txn = MakeReadWriteTransaction();
1209 auto result = conn->ExecuteBatchDml({txn, request});
1210 EXPECT_STATUS_OK(result);
1211 EXPECT_STATUS_OK(result->status);
1212 EXPECT_EQ(result->stats.size(), request.size());
1213 EXPECT_EQ(result->stats.size(), 3);
1214 EXPECT_EQ(result->stats[0].row_count, 0);
1215 EXPECT_EQ(result->stats[1].row_count, 1);
1216 EXPECT_EQ(result->stats[2].row_count, 2);
1217 EXPECT_THAT(txn, HasSessionAndTransactionId("session-name", "1234567890"));
1218 }
1219
TEST(ConnectionImplTest,ExecuteBatchDmlPartialFailure)1220 TEST(ConnectionImplTest, ExecuteBatchDmlPartialFailure) {
1221 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1222 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1223
1224 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1225 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1226
1227 auto constexpr kText = R"pb(
1228 result_sets: {
1229 metadata: { transaction: { id: "1234567890" } }
1230 stats: { row_count_exact: 42 }
1231 }
1232 result_sets: { stats: { row_count_exact: 43 } }
1233 status: { code: 2 message: "oops" }
1234 )pb";
1235 spanner_proto::ExecuteBatchDmlResponse response;
1236 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
1237 EXPECT_CALL(*mock, ExecuteBatchDml(_, _)).WillOnce(Return(response));
1238
1239 auto request = {
1240 SqlStatement("update ..."),
1241 SqlStatement("update ..."),
1242 SqlStatement("update ..."),
1243 };
1244
1245 auto conn = MakeConnection(
1246 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1247 auto txn = MakeReadWriteTransaction();
1248 auto result = conn->ExecuteBatchDml({txn, request});
1249 EXPECT_STATUS_OK(result);
1250 EXPECT_EQ(result->status.code(), StatusCode::kUnknown);
1251 EXPECT_EQ(result->status.message(), "oops");
1252 EXPECT_NE(result->stats.size(), request.size());
1253 EXPECT_EQ(result->stats.size(), 2);
1254 EXPECT_EQ(result->stats[0].row_count, 42);
1255 EXPECT_EQ(result->stats[1].row_count, 43);
1256 EXPECT_THAT(txn, HasSessionAndTransactionId("session-name", "1234567890"));
1257 }
1258
TEST(ConnectionImplTest,ExecuteBatchDmlPermanentFailure)1259 TEST(ConnectionImplTest, ExecuteBatchDmlPermanentFailure) {
1260 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1261 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1262
1263 {
1264 InSequence seq;
1265 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1266 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1267
1268 Status status(StatusCode::kPermissionDenied, "uh-oh in ExecuteBatchDml");
1269 EXPECT_CALL(*mock, ExecuteBatchDml(_, _)).WillOnce(Return(status));
1270 EXPECT_CALL(*mock, BeginTransaction(_, _))
1271 .WillOnce(Return(MakeTestTransaction()));
1272 EXPECT_CALL(*mock, ExecuteBatchDml(_, _)).WillOnce(Return(status));
1273 }
1274
1275 auto request = {
1276 SqlStatement("update ..."),
1277 SqlStatement("update ..."),
1278 SqlStatement("update ..."),
1279 };
1280
1281 auto conn = MakeLimitedRetryConnection(db, mock);
1282 auto txn = MakeReadWriteTransaction();
1283 auto result = conn->ExecuteBatchDml({txn, request});
1284 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1285 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in ExecuteBatchDml"));
1286 }
1287
TEST(ConnectionImplTest,ExecuteBatchDmlTooManyTransientFailures)1288 TEST(ConnectionImplTest, ExecuteBatchDmlTooManyTransientFailures) {
1289 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1290 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1291
1292 {
1293 InSequence seq;
1294 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1295 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1296
1297 Status status(StatusCode::kUnavailable, "try-again in ExecuteBatchDml");
1298 EXPECT_CALL(*mock, ExecuteBatchDml(_, _))
1299 .Times(AtLeast(2))
1300 .WillRepeatedly(Return(status));
1301 EXPECT_CALL(*mock, BeginTransaction(_, _))
1302 .WillOnce(Return(MakeTestTransaction()));
1303 EXPECT_CALL(*mock, ExecuteBatchDml(_, _))
1304 .Times(AtLeast(2))
1305 .WillRepeatedly(Return(status));
1306 }
1307
1308 auto request = {
1309 SqlStatement("update ..."),
1310 SqlStatement("update ..."),
1311 SqlStatement("update ..."),
1312 };
1313
1314 auto conn = MakeLimitedRetryConnection(db, mock);
1315 auto txn = MakeReadWriteTransaction();
1316 auto result = conn->ExecuteBatchDml({txn, request});
1317 EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1318 EXPECT_THAT(result.status().message(),
1319 HasSubstr("try-again in ExecuteBatchDml"));
1320 }
1321
TEST(ConnectionImplTest,ExecuteBatchDmlNoResultSets)1322 TEST(ConnectionImplTest, ExecuteBatchDmlNoResultSets) {
1323 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1324 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1325
1326 {
1327 InSequence seq;
1328 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1329 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1330 // The `ExecuteBatchDml` call can succeed, but with no `ResultSet`s and an
1331 // error status in the response.
1332 auto constexpr kText =
1333 R"pb(status: { code: 6 message: "failed to insert ..." })pb";
1334 spanner_proto::ExecuteBatchDmlResponse response;
1335 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
1336 EXPECT_CALL(
1337 *mock, ExecuteBatchDml(
1338 _, ReadRequestHasSessionAndBeginTransaction("session-name")))
1339 .WillOnce(Return(response));
1340 EXPECT_CALL(*mock, BeginTransaction(_, _))
1341 .WillOnce(Return(MakeTestTransaction("BD000001")));
1342 EXPECT_CALL(*mock, ExecuteBatchDml(_, ReadRequestHasSessionAndTransactionId(
1343 "session-name", "BD000001")))
1344 .WillOnce(Return(response));
1345 }
1346
1347 auto request = {SqlStatement("update ...")};
1348 auto conn = MakeConnection(
1349 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1350 auto txn = MakeReadWriteTransaction();
1351 auto result = conn->ExecuteBatchDml({txn, request});
1352 EXPECT_STATUS_OK(result);
1353 EXPECT_THAT(result->status, StatusIs(StatusCode::kAlreadyExists,
1354 HasSubstr("failed to insert ...")));
1355 }
1356
TEST(ConnectionImplTest,ExecutePartitionedDmlDeleteSuccess)1357 TEST(ConnectionImplTest, ExecutePartitionedDmlDeleteSuccess) {
1358 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1359 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1360 auto conn = MakeConnection(
1361 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1362
1363 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1364 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1365
1366 auto constexpr kTextTxn = R"pb(
1367 id: "1234567890"
1368 )pb";
1369 spanner_proto::Transaction txn;
1370 ASSERT_TRUE(TextFormat::ParseFromString(kTextTxn, &txn));
1371 EXPECT_CALL(*mock, BeginTransaction(_, _))
1372 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1373 .WillOnce(Return(txn));
1374
1375 auto constexpr kTextResponse = R"pb(
1376 metadata: { transaction: { id: "1234567890" } }
1377 stats: { row_count_lower_bound: 42 }
1378 )pb";
1379 spanner_proto::ResultSet response;
1380 ASSERT_TRUE(TextFormat::ParseFromString(kTextResponse, &response));
1381
1382 EXPECT_CALL(*mock, ExecuteSql(_, _))
1383 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1384 .WillOnce(Return(response));
1385 auto result =
1386 conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1387
1388 ASSERT_STATUS_OK(result);
1389 EXPECT_EQ(result->row_count_lower_bound, 42);
1390 }
1391
TEST(ConnectionImplTest,ExecutePartitionedDmlGetSessionFailure)1392 TEST(ConnectionImplTest, ExecutePartitionedDmlGetSessionFailure) {
1393 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1394 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1395 auto conn = MakeConnection(
1396 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1397 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1398 .WillOnce(
1399 [&db](grpc::ClientContext&,
1400 spanner_proto::BatchCreateSessionsRequest const& request) {
1401 EXPECT_EQ(db.FullName(), request.database());
1402 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
1403 });
1404
1405 auto result =
1406 conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1407
1408 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1409 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in GetSession"));
1410 }
1411
TEST(ConnectionImplTest,ExecutePartitionedDmlDeletePermanentFailure)1412 TEST(ConnectionImplTest, ExecutePartitionedDmlDeletePermanentFailure) {
1413 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1414 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1415 auto conn = MakeConnection(
1416 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1417
1418 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1419 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1420
1421 auto constexpr kTextTxn = R"pb(
1422 id: "1234567890"
1423 )pb";
1424 spanner_proto::Transaction txn;
1425 ASSERT_TRUE(TextFormat::ParseFromString(kTextTxn, &txn));
1426 EXPECT_CALL(*mock, BeginTransaction(_, _))
1427 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1428 .WillOnce(Return(txn));
1429
1430 EXPECT_CALL(*mock, ExecuteSql(_, _))
1431 .WillOnce(Return(Status(StatusCode::kPermissionDenied,
1432 "uh-oh in ExecutePartitionedDml")));
1433
1434 auto result =
1435 conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1436
1437 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1438 EXPECT_THAT(result.status().message(),
1439 HasSubstr("uh-oh in ExecutePartitionedDml"));
1440 }
1441
TEST(ConnectionImplTest,ExecutePartitionedDmlDeleteTooManyTransientFailures)1442 TEST(ConnectionImplTest, ExecutePartitionedDmlDeleteTooManyTransientFailures) {
1443 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1444 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1445 auto conn = MakeLimitedRetryConnection(db, mock);
1446
1447 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1448 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1449
1450 auto constexpr kTextTxn = R"pb(
1451 id: "1234567890"
1452 )pb";
1453 spanner_proto::Transaction txn;
1454 ASSERT_TRUE(TextFormat::ParseFromString(kTextTxn, &txn));
1455 EXPECT_CALL(*mock, BeginTransaction(_, _))
1456 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1457 .WillOnce(Return(txn));
1458
1459 EXPECT_CALL(*mock, ExecuteSql(_, _))
1460 .Times(AtLeast(2))
1461 .WillRepeatedly(Return(Status(StatusCode::kUnavailable,
1462 "try-again in ExecutePartitionedDml")));
1463
1464 auto result =
1465 conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1466
1467 EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1468 EXPECT_THAT(result.status().message(),
1469 HasSubstr("try-again in ExecutePartitionedDml"));
1470 }
1471
TEST(ConnectionImplTest,ExecutePartitionedDmlDeleteBeginTransactionPermanentFailure)1472 TEST(ConnectionImplTest,
1473 ExecutePartitionedDmlDeleteBeginTransactionPermanentFailure) {
1474 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1475 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1476 auto conn = MakeConnection(
1477 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1478
1479 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1480 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1481
1482 EXPECT_CALL(*mock, BeginTransaction(_, _))
1483 .WillOnce(Return(Status(StatusCode::kPermissionDenied,
1484 "uh-oh in ExecutePartitionedDml")));
1485
1486 auto result =
1487 conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1488
1489 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1490 EXPECT_THAT(result.status().message(),
1491 HasSubstr("uh-oh in ExecutePartitionedDml"));
1492 }
1493
TEST(ConnectionImplTest,ExecutePartitionedDmlDeleteBeginTransactionTooManyTransientFailures)1494 TEST(ConnectionImplTest,
1495 ExecutePartitionedDmlDeleteBeginTransactionTooManyTransientFailures) {
1496 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1497 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1498 auto conn = MakeLimitedRetryConnection(db, mock);
1499
1500 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1501 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1502
1503 EXPECT_CALL(*mock, BeginTransaction(_, _))
1504 .Times(AtLeast(2))
1505 .WillRepeatedly(Return(Status(StatusCode::kUnavailable,
1506 "try-again in ExecutePartitionedDml")));
1507
1508 auto result =
1509 conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1510
1511 EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1512 EXPECT_THAT(result.status().message(),
1513 HasSubstr("try-again in ExecutePartitionedDml"));
1514 }
1515
TEST(ConnectionImplTest,CommitGetSessionPermanentFailure)1516 TEST(ConnectionImplTest, CommitGetSessionPermanentFailure) {
1517 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1518
1519 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1520 auto conn = MakeLimitedRetryConnection(db, mock);
1521 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1522 .WillOnce(
1523 [&db](grpc::ClientContext&,
1524 spanner_proto::BatchCreateSessionsRequest const& request) {
1525 EXPECT_EQ(db.FullName(), request.database());
1526 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
1527 });
1528
1529 auto commit = conn->Commit({MakeReadWriteTransaction()});
1530 EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
1531 EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in GetSession"));
1532 }
1533
TEST(ConnectionImplTest,CommitGetSessionTooManyTransientFailures)1534 TEST(ConnectionImplTest, CommitGetSessionTooManyTransientFailures) {
1535 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1536
1537 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1538 auto conn = MakeLimitedRetryConnection(db, mock);
1539 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1540 .Times(AtLeast(2))
1541 .WillRepeatedly(
1542 [&db](grpc::ClientContext&,
1543 spanner_proto::BatchCreateSessionsRequest const& request) {
1544 EXPECT_EQ(db.FullName(), request.database());
1545 return Status(StatusCode::kUnavailable, "try-again in GetSession");
1546 });
1547
1548 auto commit = conn->Commit({MakeReadWriteTransaction()});
1549 EXPECT_EQ(StatusCode::kUnavailable, commit.status().code());
1550 EXPECT_THAT(commit.status().message(), HasSubstr("try-again in GetSession"));
1551 }
1552
TEST(ConnectionImplTest,CommitGetSessionRetry)1553 TEST(ConnectionImplTest, CommitGetSessionRetry) {
1554 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1555
1556 spanner_proto::Transaction txn = MakeTestTransaction();
1557 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1558 auto conn = MakeLimitedRetryConnection(db, mock);
1559 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1560 .WillOnce(
1561 [&db](grpc::ClientContext&,
1562 spanner_proto::BatchCreateSessionsRequest const& request) {
1563 EXPECT_EQ(db.FullName(), request.database());
1564 return Status(StatusCode::kUnavailable, "try-again in GetSession");
1565 })
1566 .WillOnce(
1567 [&db](grpc::ClientContext&,
1568 spanner_proto::BatchCreateSessionsRequest const& request) {
1569 EXPECT_EQ(db.FullName(), request.database());
1570 return MakeSessionsResponse({"test-session-name"});
1571 });
1572 EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
1573 EXPECT_CALL(*mock, Commit(_, _))
1574 .WillOnce([&txn](grpc::ClientContext&,
1575 spanner_proto::CommitRequest const& request) {
1576 EXPECT_EQ("test-session-name", request.session());
1577 EXPECT_EQ(txn.id(), request.transaction_id());
1578 return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
1579 });
1580 auto commit = conn->Commit({MakeReadWriteTransaction()});
1581 EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
1582 EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in Commit"));
1583 }
1584
TEST(ConnectionImplTest,CommitBeginTransactionRetry)1585 TEST(ConnectionImplTest, CommitBeginTransactionRetry) {
1586 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1587
1588 spanner_proto::Transaction txn = MakeTestTransaction();
1589 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1590 auto conn = MakeLimitedRetryConnection(db, mock);
1591 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1592 .WillOnce(
1593 [&db](grpc::ClientContext&,
1594 spanner_proto::BatchCreateSessionsRequest const& request) {
1595 EXPECT_EQ(db.FullName(), request.database());
1596 return MakeSessionsResponse({"test-session-name"});
1597 });
1598 EXPECT_CALL(*mock, BeginTransaction(_, _))
1599 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1600 .WillOnce(Return(txn));
1601 auto const commit_timestamp =
1602 MakeTimestamp(std::chrono::system_clock::from_time_t(123)).value();
1603 EXPECT_CALL(*mock, Commit(_, _))
1604 .WillOnce([&txn, commit_timestamp](
1605 grpc::ClientContext&,
1606 spanner_proto::CommitRequest const& request) {
1607 EXPECT_EQ("test-session-name", request.session());
1608 EXPECT_EQ(txn.id(), request.transaction_id());
1609 spanner_proto::CommitResponse response;
1610 *response.mutable_commit_timestamp() =
1611 internal::TimestampToProto(commit_timestamp);
1612 return response;
1613 });
1614
1615 auto commit = conn->Commit({MakeReadWriteTransaction()});
1616 EXPECT_STATUS_OK(commit);
1617 EXPECT_EQ(commit_timestamp, commit->commit_timestamp);
1618 }
1619
TEST(ConnectionImplTest,CommitBeginTransactionSessionNotFound)1620 TEST(ConnectionImplTest, CommitBeginTransactionSessionNotFound) {
1621 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1622 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1623 auto conn = MakeLimitedRetryConnection(db, mock);
1624 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1625 .WillOnce(
1626 [&db](grpc::ClientContext&,
1627 spanner_proto::BatchCreateSessionsRequest const& request) {
1628 EXPECT_EQ(db.FullName(), request.database());
1629 return MakeSessionsResponse({"test-session-name"});
1630 });
1631 EXPECT_CALL(*mock, BeginTransaction(_, _))
1632 .WillOnce(Return(Status(StatusCode::kNotFound, "Session not found")));
1633 auto txn = MakeReadWriteTransaction();
1634 auto commit = conn->Commit({txn});
1635 EXPECT_FALSE(commit.ok());
1636 auto status = commit.status();
1637 EXPECT_TRUE(IsSessionNotFound(status)) << status;
1638 EXPECT_THAT(txn, HasBadSession());
1639 }
1640
TEST(ConnectionImplTest,CommitBeginTransactionPermanentFailure)1641 TEST(ConnectionImplTest, CommitBeginTransactionPermanentFailure) {
1642 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1643 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1644 auto conn = MakeLimitedRetryConnection(db, mock);
1645 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1646 .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1647 EXPECT_CALL(*mock, BeginTransaction(_, _))
1648 .WillOnce(Return(
1649 Status(StatusCode::kInvalidArgument, "BeginTransaction failed")));
1650 auto txn = MakeReadWriteTransaction();
1651 EXPECT_THAT(conn->Commit({txn}),
1652 StatusIs(StatusCode::kInvalidArgument,
1653 HasSubstr("BeginTransaction failed")));
1654
1655 // Retrying the operation should also fail with the same error, without making
1656 // an additional `BeginTransaction` call.
1657 EXPECT_THAT(conn->Commit({txn}),
1658 StatusIs(StatusCode::kInvalidArgument,
1659 HasSubstr("BeginTransaction failed")));
1660 }
1661
TEST(ConnectionImplTest,CommitCommitPermanentFailure)1662 TEST(ConnectionImplTest, CommitCommitPermanentFailure) {
1663 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1664
1665 spanner_proto::Transaction txn = MakeTestTransaction();
1666 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1667 auto conn = MakeLimitedRetryConnection(db, mock);
1668 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1669 .WillOnce(
1670 [&db](grpc::ClientContext&,
1671 spanner_proto::BatchCreateSessionsRequest const& request) {
1672 EXPECT_EQ(db.FullName(), request.database());
1673 return MakeSessionsResponse({"test-session-name"});
1674 });
1675 EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
1676 EXPECT_CALL(*mock, Commit(_, _))
1677 .WillOnce([&txn](grpc::ClientContext&,
1678 spanner_proto::CommitRequest const& request) {
1679 EXPECT_EQ("test-session-name", request.session());
1680 EXPECT_EQ(txn.id(), request.transaction_id());
1681 return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
1682 });
1683 auto commit = conn->Commit({MakeReadWriteTransaction()});
1684 EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
1685 EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in Commit"));
1686 }
1687
TEST(ConnectionImplTest,CommitCommitTooManyTransientFailures)1688 TEST(ConnectionImplTest, CommitCommitTooManyTransientFailures) {
1689 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1690
1691 spanner_proto::Transaction txn = MakeTestTransaction();
1692 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1693 auto conn = MakeLimitedRetryConnection(db, mock);
1694 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1695 .WillOnce(
1696 [&db](grpc::ClientContext&,
1697 spanner_proto::BatchCreateSessionsRequest const& request) {
1698 EXPECT_EQ(db.FullName(), request.database());
1699 return MakeSessionsResponse({"test-session-name"});
1700 });
1701 EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
1702 EXPECT_CALL(*mock, Commit(_, _))
1703 .WillOnce([&txn](grpc::ClientContext&,
1704 spanner_proto::CommitRequest const& request) {
1705 EXPECT_EQ("test-session-name", request.session());
1706 EXPECT_EQ(txn.id(), request.transaction_id());
1707 return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
1708 });
1709 auto commit = conn->Commit({MakeReadWriteTransaction()});
1710 EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
1711 EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in Commit"));
1712 }
1713
TEST(ConnectionImplTest,CommitCommitInvalidatedTransaction)1714 TEST(ConnectionImplTest, CommitCommitInvalidatedTransaction) {
1715 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1716
1717 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1718 auto conn = MakeConnection(
1719 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1720 EXPECT_CALL(*mock, BatchCreateSessions(_, _)).Times(0);
1721 EXPECT_CALL(*mock, BeginTransaction(_, _)).Times(0);
1722 EXPECT_CALL(*mock, Commit(_, _)).Times(0);
1723
1724 // Committing an invalidated transaction is a unilateral error.
1725 auto txn = MakeReadWriteTransaction();
1726 SetTransactionInvalid(txn,
1727 Status(StatusCode::kAlreadyExists, "constraint error"));
1728
1729 auto commit = conn->Commit({txn});
1730 EXPECT_FALSE(commit.ok());
1731 auto status = commit.status();
1732 EXPECT_EQ(StatusCode::kAlreadyExists, commit.status().code());
1733 EXPECT_THAT(commit.status().message(), HasSubstr("constraint error"));
1734 }
1735
TEST(ConnectionImplTest,CommitCommitIdempotentTransientSuccess)1736 TEST(ConnectionImplTest, CommitCommitIdempotentTransientSuccess) {
1737 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1738
1739 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1740 auto conn = MakeConnection(
1741 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1742 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1743 .WillOnce(
1744 [&db](grpc::ClientContext&,
1745 spanner_proto::BatchCreateSessionsRequest const& request) {
1746 EXPECT_EQ(db.FullName(), request.database());
1747 return MakeSessionsResponse({"test-session-name"});
1748 });
1749 auto const commit_timestamp =
1750 MakeTimestamp(std::chrono::system_clock::from_time_t(123)).value();
1751 EXPECT_CALL(*mock, Commit(_, _))
1752 .WillOnce([](grpc::ClientContext&,
1753 spanner_proto::CommitRequest const& request) {
1754 EXPECT_EQ("test-session-name", request.session());
1755 EXPECT_EQ("test-txn-id", request.transaction_id());
1756 return Status(StatusCode::kUnavailable, "try-again");
1757 })
1758 .WillOnce(
1759 [commit_timestamp](grpc::ClientContext&,
1760 spanner_proto::CommitRequest const& request) {
1761 EXPECT_EQ("test-session-name", request.session());
1762 EXPECT_EQ("test-txn-id", request.transaction_id());
1763 spanner_proto::CommitResponse response;
1764 *response.mutable_commit_timestamp() =
1765 internal::TimestampToProto(commit_timestamp);
1766 return response;
1767 });
1768
1769 // Set the id because that makes the commit idempotent.
1770 auto txn = MakeReadWriteTransaction();
1771 SetTransactionId(txn, "test-txn-id");
1772
1773 auto commit = conn->Commit({txn});
1774 EXPECT_STATUS_OK(commit);
1775 EXPECT_EQ(commit_timestamp, commit->commit_timestamp);
1776 }
1777
TEST(ConnectionImplTest,CommitSuccessWithTransactionId)1778 TEST(ConnectionImplTest, CommitSuccessWithTransactionId) {
1779 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1780
1781 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1782 auto conn = MakeConnection(
1783 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1784 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1785 .WillOnce(
1786 [&db](grpc::ClientContext&,
1787 spanner_proto::BatchCreateSessionsRequest const& request) {
1788 EXPECT_EQ(db.FullName(), request.database());
1789 return MakeSessionsResponse({"test-session-name"});
1790 });
1791 EXPECT_CALL(*mock, Commit(_, _))
1792 .WillOnce([](grpc::ClientContext&,
1793 spanner_proto::CommitRequest const& request) {
1794 EXPECT_EQ("test-session-name", request.session());
1795 EXPECT_EQ("test-txn-id", request.transaction_id());
1796 spanner_proto::CommitResponse response;
1797 *response.mutable_commit_timestamp() = internal::TimestampToProto(
1798 MakeTimestamp(std::chrono::system_clock::from_time_t(123)).value());
1799 return response;
1800 });
1801
1802 // Set the id because that makes the commit idempotent.
1803 auto txn = MakeReadWriteTransaction();
1804 SetTransactionId(txn, "test-txn-id");
1805
1806 auto commit = conn->Commit({txn});
1807 EXPECT_STATUS_OK(commit);
1808 }
1809
TEST(ConnectionImplTest,RollbackGetSessionFailure)1810 TEST(ConnectionImplTest, RollbackGetSessionFailure) {
1811 auto db = Database("project", "instance", "database");
1812
1813 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1814 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1815 .WillOnce(
1816 [&db](grpc::ClientContext&,
1817 spanner_proto::BatchCreateSessionsRequest const& request) {
1818 EXPECT_EQ(db.FullName(), request.database());
1819 return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
1820 });
1821 EXPECT_CALL(*mock, Rollback(_, _)).Times(0);
1822
1823 auto conn = MakeConnection(
1824 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1825 auto txn = MakeReadWriteTransaction();
1826 SetTransactionId(txn, "test-txn-id");
1827 auto rollback = conn->Rollback({txn});
1828 EXPECT_EQ(StatusCode::kPermissionDenied, rollback.code());
1829 EXPECT_THAT(rollback.message(), HasSubstr("uh-oh in GetSession"));
1830 }
1831
TEST(ConnectionImplTest,RollbackBeginTransaction)1832 TEST(ConnectionImplTest, RollbackBeginTransaction) {
1833 auto db = Database("project", "instance", "database");
1834 std::string const session_name = "test-session-name";
1835
1836 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1837 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1838 .WillOnce(
1839 [&db](grpc::ClientContext&,
1840 spanner_proto::BatchCreateSessionsRequest const& request) {
1841 EXPECT_EQ(db.FullName(), request.database());
1842 return MakeSessionsResponse({"test-session-name"});
1843 });
1844 EXPECT_CALL(*mock, BeginTransaction(_, _))
1845 .WillOnce(Return(MakeTestTransaction("RollbackBeginTransaction")));
1846 EXPECT_CALL(*mock, Rollback(_, _))
1847 .WillOnce([](grpc::ClientContext&,
1848 spanner_proto::RollbackRequest const& request) {
1849 EXPECT_EQ("test-session-name", request.session());
1850 EXPECT_EQ("RollbackBeginTransaction", request.transaction_id());
1851 return Status();
1852 });
1853
1854 auto conn = MakeConnection(
1855 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1856 auto txn = MakeReadWriteTransaction();
1857 auto rollback = conn->Rollback({txn});
1858 EXPECT_STATUS_OK(rollback);
1859 }
1860
TEST(ConnectionImplTest,RollbackSingleUseTransaction)1861 TEST(ConnectionImplTest, RollbackSingleUseTransaction) {
1862 auto db = Database("project", "instance", "database");
1863 std::string const session_name = "test-session-name";
1864
1865 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1866 EXPECT_CALL(*mock, BatchCreateSessions(_, _)).Times(0);
1867 EXPECT_CALL(*mock, Rollback(_, _)).Times(0);
1868
1869 auto conn = MakeConnection(
1870 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1871 auto txn = internal::MakeSingleUseTransaction(
1872 Transaction::SingleUseOptions{Transaction::ReadOnlyOptions{}});
1873 auto rollback = conn->Rollback({txn});
1874 EXPECT_EQ(StatusCode::kInvalidArgument, rollback.code());
1875 EXPECT_THAT(rollback.message(), HasSubstr("Cannot rollback"));
1876 }
1877
TEST(ConnectionImplTest,RollbackPermanentFailure)1878 TEST(ConnectionImplTest, RollbackPermanentFailure) {
1879 auto db = Database("project", "instance", "database");
1880 std::string const session_name = "test-session-name";
1881 std::string const transaction_id = "test-txn-id";
1882
1883 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1884 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1885 .WillOnce([&db, &session_name](
1886 grpc::ClientContext&,
1887 spanner_proto::BatchCreateSessionsRequest const& request) {
1888 EXPECT_EQ(db.FullName(), request.database());
1889 return MakeSessionsResponse({session_name});
1890 });
1891 EXPECT_CALL(*mock, Rollback(_, _))
1892 .WillOnce([&session_name, &transaction_id](
1893 grpc::ClientContext&,
1894 spanner_proto::RollbackRequest const& request) {
1895 EXPECT_EQ(session_name, request.session());
1896 EXPECT_EQ(transaction_id, request.transaction_id());
1897 return Status(StatusCode::kPermissionDenied, "uh-oh in Rollback");
1898 });
1899
1900 auto conn = MakeLimitedRetryConnection(db, mock);
1901 auto txn = MakeReadWriteTransaction();
1902 SetTransactionId(txn, transaction_id);
1903 auto rollback = conn->Rollback({txn});
1904 EXPECT_EQ(StatusCode::kPermissionDenied, rollback.code());
1905 EXPECT_THAT(rollback.message(), HasSubstr("uh-oh in Rollback"));
1906 }
1907
TEST(ConnectionImplTest,RollbackTooManyTransientFailures)1908 TEST(ConnectionImplTest, RollbackTooManyTransientFailures) {
1909 auto db = Database("project", "instance", "database");
1910 std::string const session_name = "test-session-name";
1911 std::string const transaction_id = "test-txn-id";
1912
1913 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1914 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1915 .WillOnce([&db, &session_name](
1916 grpc::ClientContext&,
1917 spanner_proto::BatchCreateSessionsRequest const& request) {
1918 EXPECT_EQ(db.FullName(), request.database());
1919 return MakeSessionsResponse({session_name});
1920 });
1921 EXPECT_CALL(*mock, Rollback(_, _))
1922 .Times(AtLeast(2))
1923 .WillRepeatedly([&session_name, &transaction_id](
1924 grpc::ClientContext&,
1925 spanner_proto::RollbackRequest const& request) {
1926 EXPECT_EQ(session_name, request.session());
1927 EXPECT_EQ(transaction_id, request.transaction_id());
1928 return Status(StatusCode::kUnavailable, "try-again in Rollback");
1929 });
1930
1931 auto conn = MakeLimitedRetryConnection(db, mock);
1932 auto txn = MakeReadWriteTransaction();
1933 SetTransactionId(txn, transaction_id);
1934 auto rollback = conn->Rollback({txn});
1935 EXPECT_EQ(StatusCode::kUnavailable, rollback.code());
1936 EXPECT_THAT(rollback.message(), HasSubstr("try-again in Rollback"));
1937 }
1938
TEST(ConnectionImplTest,RollbackSuccess)1939 TEST(ConnectionImplTest, RollbackSuccess) {
1940 auto db = Database("project", "instance", "database");
1941 std::string const session_name = "test-session-name";
1942 std::string const transaction_id = "test-txn-id";
1943
1944 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1945 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1946 .WillOnce([&db, &session_name](
1947 grpc::ClientContext&,
1948 spanner_proto::BatchCreateSessionsRequest const& request) {
1949 EXPECT_EQ(db.FullName(), request.database());
1950 return MakeSessionsResponse({session_name});
1951 });
1952 EXPECT_CALL(*mock, Rollback(_, _))
1953 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1954 .WillOnce([&session_name, &transaction_id](
1955 grpc::ClientContext&,
1956 spanner_proto::RollbackRequest const& request) {
1957 EXPECT_EQ(session_name, request.session());
1958 EXPECT_EQ(transaction_id, request.transaction_id());
1959 return Status();
1960 });
1961
1962 auto conn = MakeConnection(
1963 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1964 auto txn = MakeReadWriteTransaction();
1965 SetTransactionId(txn, transaction_id);
1966 auto rollback = conn->Rollback({txn});
1967 EXPECT_STATUS_OK(rollback);
1968 }
1969
TEST(ConnectionImplTest,RollbackInvalidatedTransaction)1970 TEST(ConnectionImplTest, RollbackInvalidatedTransaction) {
1971 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1972
1973 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1974 auto conn = MakeConnection(
1975 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1976 EXPECT_CALL(*mock, BatchCreateSessions(_, _)).Times(0);
1977 EXPECT_CALL(*mock, Rollback(_, _)).Times(0);
1978
1979 // Rolling back an invalidated transaction is a unilateral success.
1980 auto txn = MakeReadWriteTransaction();
1981 SetTransactionInvalid(txn,
1982 Status(StatusCode::kAlreadyExists, "constraint error"));
1983
1984 auto rollback_status = conn->Rollback({txn});
1985 EXPECT_EQ(StatusCode::kAlreadyExists, rollback_status.code());
1986 EXPECT_THAT(rollback_status.message(), HasSubstr("constraint error"));
1987 }
1988
TEST(ConnectionImplTest,PartitionReadSuccess)1989 TEST(ConnectionImplTest, PartitionReadSuccess) {
1990 auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
1991 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1992 auto conn =
1993 MakeConnection(db, {mock_spanner_stub},
1994 ConnectionOptions{grpc::InsecureChannelCredentials()});
1995 EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
1996 .WillOnce(
1997 [&db](grpc::ClientContext&,
1998 spanner_proto::BatchCreateSessionsRequest const& request) {
1999 EXPECT_EQ(db.FullName(), request.database());
2000 return MakeSessionsResponse({"test-session-name"});
2001 });
2002
2003 auto constexpr kTextPartitionResponse = R"pb(
2004 partitions: { partition_token: "BADDECAF" }
2005 partitions: { partition_token: "DEADBEEF" }
2006 transaction: { id: "CAFEDEAD" }
2007 )pb";
2008 google::spanner::v1::PartitionResponse partition_response;
2009 ASSERT_TRUE(
2010 TextFormat::ParseFromString(kTextPartitionResponse, &partition_response));
2011
2012 auto constexpr kTextPartitionRequest = R"pb(
2013 session: "test-session-name"
2014 transaction: {
2015 begin { read_only { strong: true return_read_timestamp: true } }
2016 }
2017 table: "table"
2018 columns: "UserId"
2019 columns: "UserName"
2020 key_set: { all: true }
2021 partition_options: {}
2022 )pb";
2023 google::spanner::v1::PartitionReadRequest partition_request;
2024 ASSERT_TRUE(
2025 TextFormat::ParseFromString(kTextPartitionRequest, &partition_request));
2026
2027 EXPECT_CALL(*mock_spanner_stub,
2028 PartitionRead(_, IsProtoEqual(partition_request)))
2029 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
2030 .WillOnce(Return(partition_response));
2031
2032 Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
2033 StatusOr<std::vector<ReadPartition>> result = conn->PartitionRead(
2034 {{txn, "table", KeySet::All(), {"UserId", "UserName"}}});
2035 ASSERT_STATUS_OK(result);
2036 EXPECT_THAT(txn, HasSessionAndTransactionId("test-session-name", "CAFEDEAD"));
2037
2038 std::vector<ReadPartition> expected_read_partitions = {
2039 internal::MakeReadPartition("CAFEDEAD", "test-session-name", "BADDECAF",
2040 "table", KeySet::All(),
2041 {"UserId", "UserName"}),
2042 internal::MakeReadPartition("CAFEDEAD", "test-session-name", "DEADBEEF",
2043 "table", KeySet::All(),
2044 {"UserId", "UserName"})};
2045
2046 EXPECT_THAT(*result, testing::UnorderedPointwise(testing::Eq(),
2047 expected_read_partitions));
2048 }
2049
TEST(ConnectionImplTest,PartitionReadPermanentFailure)2050 TEST(ConnectionImplTest, PartitionReadPermanentFailure) {
2051 auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2052 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2053 auto conn = MakeLimitedRetryConnection(db, mock_spanner_stub);
2054
2055 {
2056 InSequence seq;
2057 EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2058 .WillOnce(
2059 [&db](grpc::ClientContext&,
2060 spanner_proto::BatchCreateSessionsRequest const& request) {
2061 EXPECT_EQ(db.FullName(), request.database());
2062 return MakeSessionsResponse({"test-session-name"});
2063 });
2064
2065 Status status(StatusCode::kPermissionDenied, "uh-oh");
2066 EXPECT_CALL(*mock_spanner_stub, PartitionRead(_, _))
2067 .WillOnce(Return(status));
2068 EXPECT_CALL(*mock_spanner_stub, BeginTransaction(_, _))
2069 .WillOnce(Return(MakeTestTransaction()));
2070 EXPECT_CALL(*mock_spanner_stub, PartitionRead(_, _))
2071 .WillOnce(Return(status));
2072 }
2073
2074 StatusOr<std::vector<ReadPartition>> result = conn->PartitionRead(
2075 {{MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()),
2076 "table",
2077 KeySet::All(),
2078 {"UserId", "UserName"}}});
2079 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
2080 EXPECT_THAT(result.status().message(), HasSubstr("uh-oh"));
2081 }
2082
TEST(ConnectionImplTest,PartitionReadTooManyTransientFailures)2083 TEST(ConnectionImplTest, PartitionReadTooManyTransientFailures) {
2084 auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2085 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2086 auto conn = MakeLimitedRetryConnection(db, mock_spanner_stub);
2087
2088 {
2089 InSequence seq;
2090 EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2091 .WillOnce(
2092 [&db](grpc::ClientContext&,
2093 spanner_proto::BatchCreateSessionsRequest const& request) {
2094 EXPECT_EQ(db.FullName(), request.database());
2095 return MakeSessionsResponse({"test-session-name"});
2096 });
2097
2098 Status status(StatusCode::kUnavailable, "try-again");
2099 EXPECT_CALL(*mock_spanner_stub, PartitionRead(_, _))
2100 .Times(AtLeast(2))
2101 .WillRepeatedly(Return(status));
2102 EXPECT_CALL(*mock_spanner_stub, BeginTransaction(_, _))
2103 .WillOnce(Return(MakeTestTransaction()));
2104 EXPECT_CALL(*mock_spanner_stub, PartitionRead(_, _))
2105 .Times(AtLeast(2))
2106 .WillRepeatedly(Return(status));
2107 }
2108
2109 StatusOr<std::vector<ReadPartition>> result = conn->PartitionRead(
2110 {{MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()),
2111 "table",
2112 KeySet::All(),
2113 {"UserId", "UserName"}}});
2114 EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
2115 EXPECT_THAT(result.status().message(), HasSubstr("try-again"));
2116 }
2117
TEST(ConnectionImplTest,PartitionQuerySuccess)2118 TEST(ConnectionImplTest, PartitionQuerySuccess) {
2119 auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2120 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2121 auto conn =
2122 MakeConnection(db, {mock_spanner_stub},
2123 ConnectionOptions{grpc::InsecureChannelCredentials()});
2124 EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2125 .WillOnce(
2126 [&db](grpc::ClientContext&,
2127 spanner_proto::BatchCreateSessionsRequest const& request) {
2128 EXPECT_EQ(db.FullName(), request.database());
2129 return MakeSessionsResponse({"test-session-name"});
2130 });
2131
2132 auto constexpr kTextPartitionResponse = R"pb(
2133 partitions: { partition_token: "BADDECAF" }
2134 partitions: { partition_token: "DEADBEEF" }
2135 transaction: { id: "CAFEDEAD" }
2136 )pb";
2137 google::spanner::v1::PartitionResponse partition_response;
2138 ASSERT_TRUE(
2139 TextFormat::ParseFromString(kTextPartitionResponse, &partition_response));
2140
2141 auto constexpr kTextPartitionRequest = R"pb(
2142 session: "test-session-name"
2143 transaction: {
2144 begin { read_only { strong: true return_read_timestamp: true } }
2145 }
2146 sql: "select * from table"
2147 params: {}
2148 partition_options: {}
2149 )pb";
2150 google::spanner::v1::PartitionQueryRequest partition_request;
2151 ASSERT_TRUE(
2152 TextFormat::ParseFromString(kTextPartitionRequest, &partition_request));
2153 EXPECT_CALL(*mock_spanner_stub,
2154 PartitionQuery(_, IsProtoEqual(partition_request)))
2155 .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
2156 .WillOnce(Return(partition_response));
2157
2158 SqlStatement sql_statement("select * from table");
2159 StatusOr<std::vector<QueryPartition>> result = conn->PartitionQuery(
2160 {MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()), sql_statement});
2161 ASSERT_STATUS_OK(result);
2162
2163 std::vector<QueryPartition> expected_query_partitions = {
2164 internal::MakeQueryPartition("CAFEDEAD", "test-session-name", "BADDECAF",
2165 sql_statement),
2166 internal::MakeQueryPartition("CAFEDEAD", "test-session-name", "DEADBEEF",
2167 sql_statement)};
2168
2169 EXPECT_THAT(*result, testing::UnorderedPointwise(testing::Eq(),
2170 expected_query_partitions));
2171 }
2172
TEST(ConnectionImplTest,PartitionQueryPermanentFailure)2173 TEST(ConnectionImplTest, PartitionQueryPermanentFailure) {
2174 auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2175 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2176 auto conn = MakeLimitedRetryConnection(db, mock_spanner_stub);
2177
2178 Status failed_status = Status(StatusCode::kPermissionDenied, "End of line.");
2179 {
2180 InSequence seq;
2181 EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2182 .WillOnce(
2183 [&db](grpc::ClientContext&,
2184 spanner_proto::BatchCreateSessionsRequest const& request) {
2185 EXPECT_EQ(db.FullName(), request.database());
2186 return MakeSessionsResponse({"test-session-name"});
2187 });
2188
2189 EXPECT_CALL(*mock_spanner_stub, PartitionQuery(_, _))
2190 .WillOnce(Return(failed_status));
2191 EXPECT_CALL(*mock_spanner_stub, BeginTransaction(_, _))
2192 .WillOnce(Return(MakeTestTransaction()));
2193 EXPECT_CALL(*mock_spanner_stub, PartitionQuery(_, _))
2194 .WillOnce(Return(failed_status));
2195 }
2196
2197 StatusOr<std::vector<QueryPartition>> result = conn->PartitionQuery(
2198 {MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()),
2199 SqlStatement("select * from table")});
2200 EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
2201 EXPECT_THAT(result.status().message(), HasSubstr(failed_status.message()));
2202 }
2203
TEST(ConnectionImplTest,PartitionQueryTooManyTransientFailures)2204 TEST(ConnectionImplTest, PartitionQueryTooManyTransientFailures) {
2205 auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2206 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2207 auto conn = MakeLimitedRetryConnection(db, mock_spanner_stub);
2208
2209 Status failed_status =
2210 Status(StatusCode::kUnavailable, "try-again in PartitionQuery");
2211 {
2212 InSequence seq;
2213 EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2214 .WillOnce(
2215 [&db](grpc::ClientContext&,
2216 spanner_proto::BatchCreateSessionsRequest const& request) {
2217 EXPECT_EQ(db.FullName(), request.database());
2218 return MakeSessionsResponse({"test-session-name"});
2219 });
2220
2221 EXPECT_CALL(*mock_spanner_stub, PartitionQuery(_, _))
2222 .Times(AtLeast(2))
2223 .WillRepeatedly(Return(failed_status));
2224 EXPECT_CALL(*mock_spanner_stub, BeginTransaction(_, _))
2225 .WillOnce(Return(MakeTestTransaction()));
2226 EXPECT_CALL(*mock_spanner_stub, PartitionQuery(_, _))
2227 .Times(AtLeast(2))
2228 .WillRepeatedly(Return(failed_status));
2229 }
2230
2231 StatusOr<std::vector<QueryPartition>> result = conn->PartitionQuery(
2232 {MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()),
2233 SqlStatement("select * from table")});
2234 EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
2235 EXPECT_THAT(result.status().message(), HasSubstr(failed_status.message()));
2236 }
2237
TEST(ConnectionImplTest,MultipleThreads)2238 TEST(ConnectionImplTest, MultipleThreads) {
2239 auto db = Database("project", "instance", "database");
2240 std::string const session_prefix = "test-session-prefix-";
2241 std::string const transaction_id = "test-txn-id";
2242 std::atomic<int> session_counter(0);
2243
2244 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2245 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2246 .WillRepeatedly(
2247 [&db, &session_prefix, &session_counter](
2248 grpc::ClientContext&,
2249 spanner_proto::BatchCreateSessionsRequest const& request) {
2250 EXPECT_EQ(db.FullName(), request.database());
2251 spanner_proto::BatchCreateSessionsResponse response;
2252 for (int i = 0; i < request.session_count(); ++i) {
2253 response.add_session()->set_name(
2254 session_prefix + std::to_string(++session_counter));
2255 }
2256 return response;
2257 });
2258 EXPECT_CALL(*mock, Rollback(_, _))
2259 .WillRepeatedly(
2260 [session_prefix](grpc::ClientContext&,
2261 spanner_proto::RollbackRequest const& request) {
2262 EXPECT_THAT(request.session(), StartsWith(session_prefix));
2263 return Status();
2264 });
2265
2266 auto conn = MakeConnection(
2267 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
2268
2269 int const per_thread_iterations = 1000;
2270 auto const thread_count = []() -> unsigned {
2271 if (std::thread::hardware_concurrency() == 0) {
2272 return 16;
2273 }
2274 return std::thread::hardware_concurrency();
2275 }();
2276
2277 auto runner = [](int thread_id, int iterations, Connection* conn) {
2278 for (int i = 0; i != iterations; ++i) {
2279 auto txn = MakeReadWriteTransaction();
2280 SetTransactionId(
2281 txn, "txn-" + std::to_string(thread_id) + ":" + std::to_string(i));
2282 auto rollback = conn->Rollback({txn});
2283 EXPECT_STATUS_OK(rollback);
2284 }
2285 };
2286
2287 std::vector<std::future<void>> tasks;
2288 for (unsigned i = 0; i != thread_count; ++i) {
2289 tasks.push_back(std::async(std::launch::async, runner, i,
2290 per_thread_iterations, conn.get()));
2291 }
2292
2293 for (auto& f : tasks) {
2294 f.get();
2295 }
2296 }
2297
2298 /**
2299 * @test Verify Transactions remain bound to a single Session.
2300 *
2301 * This test makes interleaved Read() calls using two separate Transactions,
2302 * and ensures each Transaction uses the same session consistently.
2303 */
TEST(ConnectionImplTest,TransactionSessionBinding)2304 TEST(ConnectionImplTest, TransactionSessionBinding) {
2305 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2306
2307 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2308 auto conn = MakeConnection(
2309 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
2310 EXPECT_CALL(*mock,
2311 BatchCreateSessions(_, BatchCreateSessionsRequestHasDatabase(db)))
2312 .WillOnce(Return(MakeSessionsResponse({"session-1"})))
2313 .WillOnce(Return(MakeSessionsResponse({"session-2"})));
2314
2315 constexpr int kNumResponses = 4;
2316 std::array<spanner_proto::PartialResultSet, kNumResponses> responses;
2317 std::array<std::unique_ptr<MockGrpcReader>, kNumResponses> readers;
2318 for (int i = 0; i < kNumResponses; ++i) {
2319 auto constexpr kText = R"pb(
2320 metadata: {
2321 row_type: {
2322 fields: {
2323 name: "Number",
2324 type: { code: INT64 }
2325 }
2326 }
2327 }
2328 )pb";
2329 ASSERT_TRUE(TextFormat::ParseFromString(kText, &responses[i]));
2330 // The first two responses are reads from two different "begin"
2331 // transactions.
2332 switch (i) {
2333 case 0:
2334 *responses[i].mutable_metadata()->mutable_transaction() =
2335 MakeTestTransaction("ABCDEF01");
2336 break;
2337 case 1:
2338 *responses[i].mutable_metadata()->mutable_transaction() =
2339 MakeTestTransaction("ABCDEF02");
2340 break;
2341 }
2342 responses[i].add_values()->set_string_value(std::to_string(i));
2343
2344 readers[i] = absl::make_unique<MockGrpcReader>();
2345 EXPECT_CALL(*readers[i], Read(_))
2346 .WillOnce(DoAll(SetArgPointee<0>(responses[i]), Return(true)))
2347 .WillOnce(Return(false));
2348 EXPECT_CALL(*readers[i], Finish()).WillOnce(Return(grpc::Status()));
2349 }
2350
2351 // Ensure the StreamingRead calls have the expected session and transaction
2352 // IDs or "begin" set as appropriate.
2353 {
2354 InSequence s;
2355 EXPECT_CALL(
2356 *mock,
2357 StreamingRead(_, ReadRequestHasSessionAndBeginTransaction("session-1")))
2358 .WillOnce(Return(ByMove(std::move(readers[0]))));
2359 EXPECT_CALL(
2360 *mock,
2361 StreamingRead(_, ReadRequestHasSessionAndBeginTransaction("session-2")))
2362 .WillOnce(Return(ByMove(std::move(readers[1]))));
2363 EXPECT_CALL(*mock, StreamingRead(_, ReadRequestHasSessionAndTransactionId(
2364 "session-1", "ABCDEF01")))
2365 .WillOnce(Return(ByMove(std::move(readers[2]))));
2366 EXPECT_CALL(*mock, StreamingRead(_, ReadRequestHasSessionAndTransactionId(
2367 "session-2", "ABCDEF02")))
2368 .WillOnce(Return(ByMove(std::move(readers[3]))));
2369 }
2370
2371 // Now do the actual reads and verify the results.
2372 Transaction txn1 = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
2373 auto rows = conn->Read({txn1, "table", KeySet::All(), {"Number"}});
2374 EXPECT_THAT(txn1, HasSessionAndTransactionId("session-1", "ABCDEF01"));
2375 for (auto& row : StreamOf<std::tuple<std::int64_t>>(rows)) {
2376 EXPECT_STATUS_OK(row);
2377 EXPECT_EQ(std::get<0>(*row), 0);
2378 }
2379
2380 Transaction txn2 = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
2381 rows = conn->Read({txn2, "table", KeySet::All(), {"Number"}});
2382 EXPECT_THAT(txn2, HasSessionAndTransactionId("session-2", "ABCDEF02"));
2383 for (auto& row : StreamOf<std::tuple<std::int64_t>>(rows)) {
2384 EXPECT_STATUS_OK(row);
2385 EXPECT_EQ(std::get<0>(*row), 1);
2386 }
2387
2388 rows = conn->Read({txn1, "table", KeySet::All(), {"Number"}});
2389 EXPECT_THAT(txn1, HasSessionAndTransactionId("session-1", "ABCDEF01"));
2390 for (auto& row : StreamOf<std::tuple<std::int64_t>>(rows)) {
2391 EXPECT_STATUS_OK(row);
2392 EXPECT_EQ(std::get<0>(*row), 2);
2393 }
2394
2395 rows = conn->Read({txn2, "table", KeySet::All(), {"Number"}});
2396 EXPECT_THAT(txn2, HasSessionAndTransactionId("session-2", "ABCDEF02"));
2397 for (auto& row : StreamOf<std::tuple<std::int64_t>>(rows)) {
2398 EXPECT_STATUS_OK(row);
2399 EXPECT_EQ(std::get<0>(*row), 3);
2400 }
2401 }
2402
2403 /**
2404 * @test Verify if a `Transaction` outlives the `ConnectionImpl` it was used
2405 * with, it does not call back into the deleted `ConnectionImpl` to release
2406 * the associated `Session` (which would be detected in asan/msan builds.)
2407 */
TEST(ConnectionImplTest,TransactionOutlivesConnection)2408 TEST(ConnectionImplTest, TransactionOutlivesConnection) {
2409 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2410
2411 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2412 auto conn = MakeConnection(
2413 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
2414 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2415 .WillOnce(
2416 [&db](grpc::ClientContext&,
2417 spanner_proto::BatchCreateSessionsRequest const& request) {
2418 EXPECT_EQ(db.FullName(), request.database());
2419 return MakeSessionsResponse({"test-session-name"});
2420 });
2421
2422 auto grpc_reader = absl::make_unique<MockGrpcReader>();
2423 auto constexpr kText = R"pb(metadata: { transaction: { id: "ABCDEF00" } })pb";
2424 spanner_proto::PartialResultSet response;
2425 ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
2426 EXPECT_CALL(*grpc_reader, Read(_))
2427 .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)));
2428 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
2429 EXPECT_CALL(*mock, StreamingRead(_, _))
2430 .WillOnce(Return(ByMove(std::move(grpc_reader))));
2431
2432 Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
2433 auto rows = conn->Read({txn, "table", KeySet::All(), {"UserId", "UserName"}});
2434 EXPECT_THAT(txn, HasSessionAndTransactionId("test-session-name", "ABCDEF00"));
2435
2436 // `conn` is the only reference to the `ConnectionImpl`, so dropping it will
2437 // cause the `ConnectionImpl` object to be deleted, while `txn` and its
2438 // associated `Session` continues to live on.
2439 conn.reset();
2440 }
2441
TEST(ConnectionImplTest,ReadSessionNotFound)2442 TEST(ConnectionImplTest, ReadSessionNotFound) {
2443 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2444 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2445 .WillOnce([](grpc::ClientContext&,
2446 spanner_proto::BatchCreateSessionsRequest const&) {
2447 return MakeSessionsResponse({"test-session-name"});
2448 });
2449 auto grpc_reader = absl::make_unique<MockGrpcReader>();
2450 EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
2451 grpc::Status finish_status(grpc::StatusCode::NOT_FOUND, "Session not found");
2452 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
2453 EXPECT_CALL(*mock, StreamingRead(_, _))
2454 .WillOnce(Return(ByMove(std::move(grpc_reader))));
2455
2456 auto db = Database("project", "instance", "database");
2457 auto conn = MakeLimitedRetryConnection(db, mock);
2458 auto txn = MakeReadWriteTransaction();
2459 SetTransactionId(txn, "test-txn-id");
2460 auto params = Connection::ReadParams{txn};
2461 auto response = GetSingularRow(conn->Read(std::move(params)));
2462 EXPECT_FALSE(response.ok());
2463 auto status = response.status();
2464 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2465 EXPECT_THAT(txn, HasBadSession());
2466 }
2467
TEST(ConnectionImplTest,PartitionReadSessionNotFound)2468 TEST(ConnectionImplTest, PartitionReadSessionNotFound) {
2469 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2470 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2471 .WillOnce([](grpc::ClientContext&,
2472 spanner_proto::BatchCreateSessionsRequest const&) {
2473 return MakeSessionsResponse({"test-session-name"});
2474 });
2475 EXPECT_CALL(*mock, PartitionRead(_, _))
2476 .WillOnce(
2477 [](grpc::ClientContext&, spanner_proto::PartitionReadRequest const&) {
2478 return Status(StatusCode::kNotFound, "Session not found");
2479 });
2480
2481 auto db = Database("project", "instance", "database");
2482 auto conn = MakeLimitedRetryConnection(db, mock);
2483 auto txn = MakeReadWriteTransaction();
2484 SetTransactionId(txn, "test-txn-id");
2485 auto params = Connection::ReadParams{txn};
2486 auto response = conn->PartitionRead({params});
2487 EXPECT_FALSE(response.ok());
2488 auto status = response.status();
2489 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2490 EXPECT_THAT(txn, HasBadSession());
2491 }
2492
TEST(ConnectionImplTest,ExecuteQuerySessionNotFound)2493 TEST(ConnectionImplTest, ExecuteQuerySessionNotFound) {
2494 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2495 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2496 .WillOnce([](grpc::ClientContext&,
2497 spanner_proto::BatchCreateSessionsRequest const&) {
2498 return MakeSessionsResponse({"test-session-name"});
2499 });
2500 auto grpc_reader = absl::make_unique<MockGrpcReader>();
2501 EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
2502 grpc::Status finish_status(grpc::StatusCode::NOT_FOUND, "Session not found");
2503 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
2504 EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
2505 .WillOnce(Return(ByMove(std::move(grpc_reader))));
2506
2507 auto db = Database("project", "instance", "database");
2508 auto conn = MakeLimitedRetryConnection(db, mock);
2509 auto txn = MakeReadWriteTransaction();
2510 SetTransactionId(txn, "test-txn-id");
2511 auto response = GetSingularRow(conn->ExecuteQuery({txn}));
2512 EXPECT_FALSE(response.ok());
2513 auto status = response.status();
2514 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2515 EXPECT_THAT(txn, HasBadSession());
2516 }
2517
TEST(ConnectionImplTest,ProfileQuerySessionNotFound)2518 TEST(ConnectionImplTest, ProfileQuerySessionNotFound) {
2519 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2520 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2521 .WillOnce([](grpc::ClientContext&,
2522 spanner_proto::BatchCreateSessionsRequest const&) {
2523 return MakeSessionsResponse({"test-session-name"});
2524 });
2525 auto grpc_reader = absl::make_unique<MockGrpcReader>();
2526 EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
2527 grpc::Status finish_status(grpc::StatusCode::NOT_FOUND, "Session not found");
2528 EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
2529 EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
2530 .WillOnce(Return(ByMove(std::move(grpc_reader))));
2531
2532 auto db = Database("project", "instance", "database");
2533 auto conn = MakeLimitedRetryConnection(db, mock);
2534 auto txn = MakeReadWriteTransaction();
2535 SetTransactionId(txn, "test-txn-id");
2536 auto response = GetSingularRow(conn->ProfileQuery({txn}));
2537 EXPECT_FALSE(response.ok());
2538 auto status = response.status();
2539 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2540 EXPECT_THAT(txn, HasBadSession());
2541 }
2542
TEST(ConnectionImplTest,ExecuteDmlSessionNotFound)2543 TEST(ConnectionImplTest, ExecuteDmlSessionNotFound) {
2544 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2545 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2546 .WillOnce([](grpc::ClientContext&,
2547 spanner_proto::BatchCreateSessionsRequest const&) {
2548 return MakeSessionsResponse({"test-session-name"});
2549 });
2550 EXPECT_CALL(*mock, ExecuteSql(_, _))
2551 .WillOnce(
2552 [](grpc::ClientContext&, spanner_proto::ExecuteSqlRequest const&) {
2553 return Status(StatusCode::kNotFound, "Session not found");
2554 });
2555
2556 auto db = Database("project", "instance", "database");
2557 auto conn = MakeLimitedRetryConnection(db, mock);
2558 auto txn = MakeReadWriteTransaction();
2559 SetTransactionId(txn, "test-txn-id");
2560 auto response = conn->ExecuteDml({txn});
2561 EXPECT_FALSE(response.ok());
2562 auto status = response.status();
2563 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2564 EXPECT_THAT(txn, HasBadSession());
2565 }
2566
TEST(ConnectionImplTest,ProfileDmlSessionNotFound)2567 TEST(ConnectionImplTest, ProfileDmlSessionNotFound) {
2568 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2569 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2570 .WillOnce([](grpc::ClientContext&,
2571 spanner_proto::BatchCreateSessionsRequest const&) {
2572 return MakeSessionsResponse({"test-session-name"});
2573 });
2574 EXPECT_CALL(*mock, ExecuteSql(_, _))
2575 .WillOnce(
2576 [](grpc::ClientContext&, spanner_proto::ExecuteSqlRequest const&) {
2577 return Status(StatusCode::kNotFound, "Session not found");
2578 });
2579
2580 auto db = Database("project", "instance", "database");
2581 auto conn = MakeLimitedRetryConnection(db, mock);
2582 auto txn = MakeReadWriteTransaction();
2583 SetTransactionId(txn, "test-txn-id");
2584 auto response = conn->ProfileDml({txn});
2585 EXPECT_FALSE(response.ok());
2586 auto status = response.status();
2587 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2588 EXPECT_THAT(txn, HasBadSession());
2589 }
2590
TEST(ConnectionImplTest,AnalyzeSqlSessionNotFound)2591 TEST(ConnectionImplTest, AnalyzeSqlSessionNotFound) {
2592 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2593 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2594 .WillOnce([](grpc::ClientContext&,
2595 spanner_proto::BatchCreateSessionsRequest const&) {
2596 return MakeSessionsResponse({"test-session-name"});
2597 });
2598 EXPECT_CALL(*mock, ExecuteSql(_, _))
2599 .WillOnce(
2600 [](grpc::ClientContext&, spanner_proto::ExecuteSqlRequest const&) {
2601 return Status(StatusCode::kNotFound, "Session not found");
2602 });
2603
2604 auto db = Database("project", "instance", "database");
2605 auto conn = MakeLimitedRetryConnection(db, mock);
2606 auto txn = MakeReadWriteTransaction();
2607 SetTransactionId(txn, "test-txn-id");
2608 auto response = conn->AnalyzeSql({txn});
2609 EXPECT_FALSE(response.ok());
2610 auto status = response.status();
2611 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2612 EXPECT_THAT(txn, HasBadSession());
2613 }
2614
TEST(ConnectionImplTest,PartitionQuerySessionNotFound)2615 TEST(ConnectionImplTest, PartitionQuerySessionNotFound) {
2616 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2617 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2618 .WillOnce([](grpc::ClientContext&,
2619 spanner_proto::BatchCreateSessionsRequest const&) {
2620 return MakeSessionsResponse({"test-session-name"});
2621 });
2622 EXPECT_CALL(*mock, PartitionQuery(_, _))
2623 .WillOnce([](grpc::ClientContext&,
2624 spanner_proto::PartitionQueryRequest const&) {
2625 return Status(StatusCode::kNotFound, "Session not found");
2626 });
2627
2628 auto db = Database("project", "instance", "database");
2629 auto conn = MakeLimitedRetryConnection(db, mock);
2630 auto txn = MakeReadWriteTransaction();
2631 SetTransactionId(txn, "test-txn-id");
2632 auto response = conn->PartitionQuery({txn});
2633 EXPECT_FALSE(response.ok());
2634 auto status = response.status();
2635 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2636 EXPECT_THAT(txn, HasBadSession());
2637 }
2638
TEST(ConnectionImplTest,ExecuteBatchDmlSessionNotFound)2639 TEST(ConnectionImplTest, ExecuteBatchDmlSessionNotFound) {
2640 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2641 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2642 .WillOnce([](grpc::ClientContext&,
2643 spanner_proto::BatchCreateSessionsRequest const&) {
2644 return MakeSessionsResponse({"test-session-name"});
2645 });
2646 EXPECT_CALL(*mock, ExecuteBatchDml(_, _))
2647 .WillOnce([](grpc::ClientContext&,
2648 spanner_proto::ExecuteBatchDmlRequest const&) {
2649 return Status(StatusCode::kNotFound, "Session not found");
2650 });
2651
2652 auto db = Database("project", "instance", "database");
2653 auto conn = MakeLimitedRetryConnection(db, mock);
2654 auto txn = MakeReadWriteTransaction();
2655 SetTransactionId(txn, "test-txn-id");
2656 auto response = conn->ExecuteBatchDml({txn});
2657 EXPECT_FALSE(response.ok());
2658 auto status = response.status();
2659 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2660 EXPECT_THAT(txn, HasBadSession());
2661 }
2662
TEST(ConnectionImplTest,ExecutePartitionedDmlSessionNotFound)2663 TEST(ConnectionImplTest, ExecutePartitionedDmlSessionNotFound) {
2664 // NOTE: There's no test here becuase this method does not accept a
2665 // Transaction and so there's no way to extract the Session to check if it's
2666 // bad. We could modify the API to inject/extract this, but this is a
2667 // user-facing API that we don't want to mess up.
2668 }
2669
TEST(ConnectionImplTest,CommitSessionNotFound)2670 TEST(ConnectionImplTest, CommitSessionNotFound) {
2671 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2672 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2673 .WillOnce([](grpc::ClientContext&,
2674 spanner_proto::BatchCreateSessionsRequest const&) {
2675 return MakeSessionsResponse({"test-session-name"});
2676 });
2677 EXPECT_CALL(*mock, Commit(_, _))
2678 .WillOnce([](grpc::ClientContext&, spanner_proto::CommitRequest const&) {
2679 return Status(StatusCode::kNotFound, "Session not found");
2680 });
2681
2682 auto db = Database("project", "instance", "database");
2683 auto conn = MakeLimitedRetryConnection(db, mock);
2684 auto txn = MakeReadWriteTransaction();
2685 SetTransactionId(txn, "test-txn-id");
2686 auto response = conn->Commit({txn});
2687 EXPECT_FALSE(response.ok());
2688 auto status = response.status();
2689 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2690 EXPECT_THAT(txn, HasBadSession());
2691 }
2692
TEST(ConnectionImplTest,RollbackSessionNotFound)2693 TEST(ConnectionImplTest, RollbackSessionNotFound) {
2694 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2695 EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2696 .WillOnce([](grpc::ClientContext&,
2697 spanner_proto::BatchCreateSessionsRequest const&) {
2698 return MakeSessionsResponse({"test-session-name"});
2699 });
2700 EXPECT_CALL(*mock, Rollback(_, _))
2701 .WillOnce(
2702 [](grpc::ClientContext&, spanner_proto::RollbackRequest const&) {
2703 return Status(StatusCode::kNotFound, "Session not found");
2704 });
2705
2706 auto db = Database("project", "instance", "database");
2707 auto conn = MakeLimitedRetryConnection(db, mock);
2708 auto txn = MakeReadWriteTransaction();
2709 SetTransactionId(txn, "test-txn-id");
2710 auto status = conn->Rollback({txn});
2711 EXPECT_TRUE(IsSessionNotFound(status)) << status;
2712 EXPECT_THAT(txn, HasBadSession());
2713 }
2714
TEST(ConnectionImplTest,OperationsFailOnInvalidatedTransaction)2715 TEST(ConnectionImplTest, OperationsFailOnInvalidatedTransaction) {
2716 auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2717
2718 auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2719 auto conn = MakeConnection(
2720 db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
2721
2722 // Committing an invalidated transaction is a unilateral error.
2723 auto txn = MakeReadWriteTransaction();
2724 SetTransactionInvalid(
2725 txn, Status(StatusCode::kInvalidArgument, "BeginTransaction failed"));
2726 // All operations on an invalid transaction should return the error that
2727 // invalidated it, without actually making a RPC.
2728
2729 EXPECT_THAT(
2730 conn->Read({txn, "table", KeySet::All(), {"Column"}}).begin()->status(),
2731 StatusIs(StatusCode::kInvalidArgument,
2732 HasSubstr("BeginTransaction failed")));
2733
2734 EXPECT_THAT(conn->PartitionRead({{txn, "table", KeySet::All(), {"Column"}}}),
2735 StatusIs(StatusCode::kInvalidArgument,
2736 HasSubstr("BeginTransaction failed")));
2737
2738 EXPECT_THAT(
2739 conn->ExecuteQuery({txn, SqlStatement("select 1")}).begin()->status(),
2740 StatusIs(StatusCode::kInvalidArgument,
2741 HasSubstr("BeginTransaction failed")));
2742
2743 EXPECT_THAT(conn->ExecuteDml({txn, SqlStatement("delete * from table")}),
2744 StatusIs(StatusCode::kInvalidArgument,
2745 HasSubstr("BeginTransaction failed")));
2746
2747 EXPECT_THAT(
2748 conn->ProfileQuery({txn, SqlStatement("select 1")}).begin()->status(),
2749 StatusIs(StatusCode::kInvalidArgument,
2750 HasSubstr("BeginTransaction failed")));
2751
2752 EXPECT_THAT(conn->ProfileDml({txn, SqlStatement("delete * from table")}),
2753 StatusIs(StatusCode::kInvalidArgument,
2754 HasSubstr("BeginTransaction failed")));
2755
2756 EXPECT_THAT(conn->AnalyzeSql({txn, SqlStatement("select * from table")}),
2757 StatusIs(StatusCode::kInvalidArgument,
2758 HasSubstr("BeginTransaction failed")));
2759
2760 // ExecutePartitionedDml creates its own transaction so it's not tested here.
2761
2762 EXPECT_THAT(conn->PartitionQuery({txn, SqlStatement("select * from table")}),
2763 StatusIs(StatusCode::kInvalidArgument,
2764 HasSubstr("BeginTransaction failed")));
2765
2766 EXPECT_THAT(conn->ExecuteBatchDml({txn}),
2767 StatusIs(StatusCode::kInvalidArgument,
2768 HasSubstr("BeginTransaction failed")));
2769
2770 EXPECT_THAT(conn->Commit({txn}),
2771 StatusIs(StatusCode::kInvalidArgument,
2772 HasSubstr("BeginTransaction failed")));
2773
2774 EXPECT_THAT(conn->Rollback({txn}),
2775 StatusIs(StatusCode::kInvalidArgument,
2776 HasSubstr("BeginTransaction failed")));
2777 }
2778
2779 #if defined(__GNUC__) || defined(__clang__)
2780 #pragma GCC diagnostic pop
2781 #endif
2782
2783 } // namespace
2784 } // namespace internal
2785 } // namespace SPANNER_CLIENT_NS
2786 } // namespace spanner
2787 } // namespace cloud
2788 } // namespace google
2789