1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/spanner/internal/connection_impl.h"
16 #include "google/cloud/spanner/client.h"
17 #include "google/cloud/spanner/internal/spanner_stub.h"
18 #include "google/cloud/spanner/testing/matchers.h"
19 #include "google/cloud/spanner/testing/mock_spanner_stub.h"
20 #include "google/cloud/testing_util/assert_ok.h"
21 #include "google/cloud/testing_util/is_proto_equal.h"
22 #include "google/cloud/testing_util/status_matchers.h"
23 #include "absl/memory/memory.h"
24 #include "absl/types/optional.h"
25 #include <google/protobuf/text_format.h>
26 #include <gmock/gmock.h>
27 #include <array>
28 #include <atomic>
29 #include <chrono>
30 #include <future>
31 #include <string>
32 #include <thread>
33 #include <vector>
34 
35 namespace google {
36 namespace cloud {
37 namespace spanner {
38 inline namespace SPANNER_CLIENT_NS {
39 namespace internal {
40 namespace {
41 
42 // We compile with -Wextra which enables -Wmissing-field-initializers. This
43 // warning triggers when aggregate initialization is used with too few
44 // arguments. For example
45 //
46 //   struct A { int a; int b; int c; };  // 3 fields
47 //   A a = {1, 2};  // <-- Warning, missing initializer for A::c.
48 //
49 // To make the test code in this file more readable, we disable this warning
50 // and rely on the guranteed behavior of aggregate initialization.
51 // https://en.cppreference.com/w/cpp/language/aggregate_initialization
52 // Note: "pragma GCC" works for both GCC and clang. MSVC doesn't warn.
53 #if defined(__GNUC__) || defined(__clang__)
54 #pragma GCC diagnostic push
55 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
56 #endif
57 
58 using ::google::cloud::spanner_testing::HasSessionAndTransactionId;
59 using ::google::cloud::testing_util::IsProtoEqual;
60 using ::google::cloud::testing_util::StatusIs;
61 using ::google::protobuf::TextFormat;
62 using ::testing::_;
63 using ::testing::AtLeast;
64 using ::testing::ByMove;
65 using ::testing::DoAll;
66 using ::testing::Eq;
67 using ::testing::HasSubstr;
68 using ::testing::InSequence;
69 using ::testing::Property;
70 using ::testing::Return;
71 using ::testing::Sequence;
72 using ::testing::SetArgPointee;
73 using ::testing::StartsWith;
74 using ::testing::UnorderedPointwise;
75 
76 namespace spanner_proto = ::google::spanner::v1;
77 
78 // Matches a spanner_proto::ReadRequest with the specified `session` and
79 // `transaction_id`.
80 MATCHER_P2(ReadRequestHasSessionAndTransactionId, session, transaction_id,
81            "ReadRequest has expected session and transaction") {
82   return arg.session() == session && arg.transaction().id() == transaction_id;
83 }
84 
85 // Matches a spanner_proto::ReadRequest with the specified `session` and
86 // 'begin` set in the TransactionSelector.
87 MATCHER_P(ReadRequestHasSessionAndBeginTransaction, session,
88           "ReadRequest has expected session and transaction") {
89   return arg.session() == session && arg.transaction().has_begin();
90 }
91 
92 // Matches a spanner_proto::BatchCreateSessionsRequest with the specified
93 // `database`.
94 MATCHER_P(BatchCreateSessionsRequestHasDatabase, database,
95           "BatchCreateSessionsRequest has expected database") {
96   return arg.database() == database.FullName();
97 }
98 
99 // Matches a spanner::Transaction that is bound to a "bad" session"
100 MATCHER(HasBadSession, "bound to a session that's marked bad") {
101   return internal::Visit(
102       arg,
103       [&](internal::SessionHolder& session,
__anonf3d7660c0202(internal::SessionHolder& session, StatusOr<google::spanner::v1::TransactionSelector>&, std::int64_t) 104           StatusOr<google::spanner::v1::TransactionSelector>&, std::int64_t) {
105         if (!session) {
106           *result_listener << "has no session";
107           return false;
108         }
109         if (!session->is_bad()) {
110           *result_listener << "session expected to be bad, but was not";
111           return false;
112         }
113         return true;
114       });
115 }
116 
117 // Helper to set the Transaction's ID. Requires selector.ok().
SetTransactionId(Transaction & txn,std::string tid)118 void SetTransactionId(Transaction& txn, std::string tid) {
119   internal::Visit(txn,
120                   [&tid](SessionHolder&,
121                          StatusOr<spanner_proto::TransactionSelector>& selector,
122                          std::int64_t) {
123                     selector->set_id(std::move(tid));
124                     return 0;
125                   });
126 }
127 
128 // Helper to mark the Transaction as invalid.
SetTransactionInvalid(Transaction & txn,Status status)129 void SetTransactionInvalid(Transaction& txn, Status status) {
130   internal::Visit(
131       txn, [&status](SessionHolder&,
132                      StatusOr<spanner_proto::TransactionSelector>& selector,
133                      std::int64_t) {
134         selector = std::move(status);
135         return 0;
136       });
137 }
138 
139 // Helper to create a Transaction proto with a specified (or default) id.
MakeTestTransaction(std::string id="1234567890")140 spanner_proto::Transaction MakeTestTransaction(std::string id = "1234567890") {
141   spanner_proto::Transaction txn;
142   txn.set_id(std::move(id));
143   return txn;
144 }
145 
146 // Create a response with the given `sessions`
MakeSessionsResponse(std::vector<std::string> sessions)147 spanner_proto::BatchCreateSessionsResponse MakeSessionsResponse(
148     std::vector<std::string> sessions) {
149   spanner_proto::BatchCreateSessionsResponse response;
150   for (auto& session : sessions) {
151     response.add_session()->set_name(std::move(session));
152   }
153   return response;
154 }
155 
156 // Create a `Connection` suitable for use in tests that continue retrying
157 // until the retry policy is exhausted - attempting that with the default
158 // policies would take too long (10 minutes).
159 // Other tests can use this method or just call `MakeConnection()` directly.
MakeLimitedRetryConnection(Database const & db,std::shared_ptr<spanner_testing::MockSpannerStub> mock)160 std::shared_ptr<Connection> MakeLimitedRetryConnection(
161     Database const& db,
162     std::shared_ptr<spanner_testing::MockSpannerStub> mock) {
163   return MakeConnection(
164       db, {std::move(mock)},
165       ConnectionOptions{grpc::InsecureChannelCredentials()},
166       SessionPoolOptions{},
167       LimitedErrorCountRetryPolicy(/*maximum_failures=*/2).clone(),
168       ExponentialBackoffPolicy(/*initial_delay=*/std::chrono::microseconds(1),
169                                /*maximum_delay=*/std::chrono::microseconds(1),
170                                /*scaling=*/2.0)
171           .clone());
172 }
173 
174 class MockGrpcReader
175     : public ::grpc::ClientReaderInterface<spanner_proto::PartialResultSet> {
176  public:
177   MOCK_METHOD1(Read, bool(spanner_proto::PartialResultSet*));
178   MOCK_METHOD1(NextMessageSize, bool(std::uint32_t*));
179   MOCK_METHOD0(Finish, grpc::Status());
180   MOCK_METHOD0(WaitForInitialMetadata, void());
181 };
182 
TEST(ConnectionImplTest,ReadGetSessionFailure)183 TEST(ConnectionImplTest, ReadGetSessionFailure) {
184   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
185 
186   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
187   auto conn = MakeConnection(
188       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
189   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
190       .WillOnce(
191           [&db](grpc::ClientContext&,
192                 spanner_proto::BatchCreateSessionsRequest const& request) {
193             EXPECT_EQ(db.FullName(), request.database());
194             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
195           });
196 
197   auto rows =
198       conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
199                   "table",
200                   KeySet::All(),
201                   {"column1"}});
202   for (auto& row : rows) {
203     EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
204     EXPECT_THAT(row.status().message(), HasSubstr("uh-oh in GetSession"));
205   }
206 }
207 
TEST(ConnectionImplTest,ReadStreamingReadFailure)208 TEST(ConnectionImplTest, ReadStreamingReadFailure) {
209   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
210 
211   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
212   auto conn = MakeConnection(
213       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
214 
215   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
216       .WillOnce(
217           [&db](grpc::ClientContext&,
218                 spanner_proto::BatchCreateSessionsRequest const& request) {
219             EXPECT_EQ(db.FullName(), request.database());
220             return MakeSessionsResponse({"test-session-name"});
221           });
222 
223   auto grpc_reader = absl::make_unique<MockGrpcReader>();
224   EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
225   grpc::Status finish_status(grpc::StatusCode::PERMISSION_DENIED,
226                              "uh-oh in GrpcReader::Finish");
227   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
228   EXPECT_CALL(*mock, StreamingRead(_, _))
229       .WillOnce(Return(ByMove(std::move(grpc_reader))));
230 
231   auto rows =
232       conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
233                   "table",
234                   KeySet::All(),
235                   {"column1"}});
236   for (auto& row : rows) {
237     EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
238     EXPECT_THAT(row.status().message(),
239                 HasSubstr("uh-oh in GrpcReader::Finish"));
240   }
241 }
242 
TEST(ConnectionImplTest,ReadSuccess)243 TEST(ConnectionImplTest, ReadSuccess) {
244   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
245 
246   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
247   auto conn = MakeConnection(
248       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
249   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
250       .WillOnce(
251           [&db](grpc::ClientContext&,
252                 spanner_proto::BatchCreateSessionsRequest const& request) {
253             EXPECT_EQ(db.FullName(), request.database());
254             return MakeSessionsResponse({"test-session-name"});
255           });
256 
257   auto reader1 = absl::make_unique<MockGrpcReader>();
258   EXPECT_CALL(*reader1, Read(_)).WillOnce(Return(false));
259   EXPECT_CALL(*reader1, Finish())
260       .WillOnce(
261           Return(grpc::Status(grpc::StatusCode::UNAVAILABLE, "try-again")));
262 
263   auto reader2 = absl::make_unique<MockGrpcReader>();
264   auto constexpr kText = R"pb(
265     metadata: {
266       row_type: {
267         fields: {
268           name: "UserId",
269           type: { code: INT64 }
270         }
271         fields: {
272           name: "UserName",
273           type: { code: STRING }
274         }
275       }
276     }
277     values: { string_value: "12" }
278     values: { string_value: "Steve" }
279     values: { string_value: "42" }
280     values: { string_value: "Ann" }
281   )pb";
282   spanner_proto::PartialResultSet response;
283   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
284   EXPECT_CALL(*reader2, Read(_))
285       .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
286       .WillOnce(Return(false));
287   EXPECT_CALL(*reader2, Finish()).WillOnce(Return(grpc::Status()));
288   EXPECT_CALL(*mock, StreamingRead(_, _))
289       .WillOnce(Return(ByMove(std::move(reader1))))
290       .WillOnce(Return(ByMove(std::move(reader2))));
291 
292   auto rows =
293       conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
294                   "table",
295                   KeySet::All(),
296                   {"UserId", "UserName"}});
297   using RowType = std::tuple<std::int64_t, std::string>;
298   auto expected = std::vector<RowType>{
299       RowType(12, "Steve"),
300       RowType(42, "Ann"),
301   };
302   int row_number = 0;
303   for (auto& row : StreamOf<RowType>(rows)) {
304     EXPECT_STATUS_OK(row);
305     EXPECT_EQ(*row, expected[row_number]);
306     ++row_number;
307   }
308   EXPECT_EQ(row_number, expected.size());
309 }
310 
TEST(ConnectionImplTest,ReadPermanentFailure)311 TEST(ConnectionImplTest, ReadPermanentFailure) {
312   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
313 
314   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
315   auto conn = MakeLimitedRetryConnection(db, mock);
316   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
317       .WillOnce(
318           [&db](grpc::ClientContext&,
319                 spanner_proto::BatchCreateSessionsRequest const& request) {
320             EXPECT_EQ(db.FullName(), request.database());
321             return MakeSessionsResponse({"test-session-name"});
322           });
323 
324   auto reader1 = absl::make_unique<MockGrpcReader>();
325   EXPECT_CALL(*reader1, Read(_)).WillOnce(Return(false));
326   EXPECT_CALL(*reader1, Finish())
327       .WillOnce(
328           Return(grpc::Status(grpc::StatusCode::PERMISSION_DENIED, "uh-oh")));
329   EXPECT_CALL(*mock, StreamingRead(_, _))
330       .WillOnce(Return(ByMove(std::move(reader1))));
331 
332   auto rows =
333       conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
334                   "table",
335                   KeySet::All(),
336                   {"UserId", "UserName"}});
337   for (auto& row : rows) {
338     EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
339     EXPECT_THAT(row.status().message(), HasSubstr("uh-oh"));
340   }
341 }
342 
TEST(ConnectionImplTest,ReadTooManyTransientFailures)343 TEST(ConnectionImplTest, ReadTooManyTransientFailures) {
344   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
345 
346   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
347   auto conn = MakeLimitedRetryConnection(db, mock);
348   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
349       .WillOnce(
350           [&db](grpc::ClientContext&,
351                 spanner_proto::BatchCreateSessionsRequest const& request) {
352             EXPECT_EQ(db.FullName(), request.database());
353             return MakeSessionsResponse({"test-session-name"});
354           });
355 
356   EXPECT_CALL(*mock, StreamingRead(_, _))
357       .Times(AtLeast(2))
358       .WillRepeatedly(
359           [](grpc::ClientContext&, spanner_proto::ReadRequest const&) {
360             auto reader = absl::make_unique<MockGrpcReader>();
361             EXPECT_CALL(*reader, Read(_)).WillOnce(Return(false));
362             EXPECT_CALL(*reader, Finish())
363                 .WillOnce(Return(
364                     grpc::Status(grpc::StatusCode::UNAVAILABLE, "try-again")));
365             return reader;
366           });
367 
368   auto rows =
369       conn->Read({MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
370                   "table",
371                   KeySet::All(),
372                   {"UserId", "UserName"}});
373   for (auto& row : rows) {
374     EXPECT_EQ(StatusCode::kUnavailable, row.status().code());
375     EXPECT_THAT(row.status().message(), HasSubstr("try-again"));
376   }
377 }
378 
379 /// @test Verify implicit "begin transaction" in Read() works.
TEST(ConnectionImplTest,ReadImplicitBeginTransaction)380 TEST(ConnectionImplTest, ReadImplicitBeginTransaction) {
381   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
382 
383   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
384   auto conn = MakeConnection(
385       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
386   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
387       .WillOnce(
388           [&db](grpc::ClientContext&,
389                 spanner_proto::BatchCreateSessionsRequest const& request) {
390             EXPECT_EQ(db.FullName(), request.database());
391             return MakeSessionsResponse({"test-session-name"});
392           });
393 
394   auto grpc_reader = absl::make_unique<MockGrpcReader>();
395   auto constexpr kText = R"pb(metadata: { transaction: { id: "ABCDEF00" } })pb";
396   spanner_proto::PartialResultSet response;
397   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
398   EXPECT_CALL(*grpc_reader, Read(_))
399       .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
400       .WillOnce(Return(false));
401   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
402   EXPECT_CALL(*mock, StreamingRead(_, _))
403       .WillOnce(Return(ByMove(std::move(grpc_reader))));
404 
405   Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
406   auto rows = conn->Read({txn, "table", KeySet::All(), {"UserId", "UserName"}});
407   for (auto& row : rows) {
408     EXPECT_STATUS_OK(row);
409   }
410   EXPECT_THAT(txn, HasSessionAndTransactionId("test-session-name", "ABCDEF00"));
411 }
412 
TEST(ConnectionImplTest,ReadImplicitBeginTransactionPermanentFailure)413 TEST(ConnectionImplTest, ReadImplicitBeginTransactionPermanentFailure) {
414   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
415 
416   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
417   auto conn = MakeLimitedRetryConnection(db, mock);
418 
419   auto make_reader = [] {
420     grpc::Status grpc_status(grpc::StatusCode::PERMISSION_DENIED, "uh-oh");
421     auto reader = absl::make_unique<MockGrpcReader>();
422     EXPECT_CALL(*reader, Read(_)).WillOnce(Return(false));
423     EXPECT_CALL(*reader, Finish()).WillOnce(Return(grpc_status));
424     return reader;
425   };
426   auto reader1 = make_reader();
427   auto reader2 = make_reader();
428   // n.b. these calls are explicitly sequenced because using the scoped
429   // `InSequence` object causes gmock to get confused by the reader calls.
430   Sequence s;
431   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
432       .InSequence(s)
433       .WillOnce(
434           [&db](grpc::ClientContext&,
435                 spanner_proto::BatchCreateSessionsRequest const& request) {
436             EXPECT_EQ(db.FullName(), request.database());
437             return MakeSessionsResponse({"test-session-name"});
438           });
439   EXPECT_CALL(*mock, StreamingRead(_, ReadRequestHasSessionAndBeginTransaction(
440                                           "test-session-name")))
441       .InSequence(s)
442       .WillOnce(Return(ByMove(std::move(reader1))));
443   EXPECT_CALL(*mock, BeginTransaction(_, _))
444       .InSequence(s)
445       .WillOnce(Return(MakeTestTransaction("FEDCBA98")));
446   EXPECT_CALL(*mock, StreamingRead(_, ReadRequestHasSessionAndTransactionId(
447                                           "test-session-name", "FEDCBA98")))
448       .InSequence(s)
449       .WillOnce(Return(ByMove(std::move(reader2))));
450 
451   Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
452   auto rows = conn->Read({txn, "table", KeySet::All(), {"UserId", "UserName"}});
453   for (auto& row : rows) {
454     EXPECT_THAT(row,
455                 StatusIs(StatusCode::kPermissionDenied, HasSubstr("uh-oh")));
456   }
457 }
458 
TEST(ConnectionImplTest,ExecuteQueryGetSessionFailure)459 TEST(ConnectionImplTest, ExecuteQueryGetSessionFailure) {
460   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
461   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
462   auto conn = MakeConnection(
463       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
464   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
465       .WillOnce(
466           [&db](grpc::ClientContext&,
467                 spanner_proto::BatchCreateSessionsRequest const& request) {
468             EXPECT_EQ(db.FullName(), request.database());
469             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
470           });
471 
472   auto rows = conn->ExecuteQuery(
473       {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
474        SqlStatement("select * from table")});
475   for (auto& row : rows) {
476     EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
477     EXPECT_THAT(row.status().message(), HasSubstr("uh-oh in GetSession"));
478   }
479 }
480 
TEST(ConnectionImplTest,ExecuteQueryStreamingReadFailure)481 TEST(ConnectionImplTest, ExecuteQueryStreamingReadFailure) {
482   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
483 
484   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
485   auto conn = MakeConnection(
486       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
487   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
488       .WillOnce(
489           [&db](grpc::ClientContext&,
490                 spanner_proto::BatchCreateSessionsRequest const& request) {
491             EXPECT_EQ(db.FullName(), request.database());
492             return MakeSessionsResponse({"test-session-name"});
493           });
494 
495   auto grpc_reader = absl::make_unique<MockGrpcReader>();
496   EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
497   grpc::Status finish_status(grpc::StatusCode::PERMISSION_DENIED,
498                              "uh-oh in GrpcReader::Finish");
499   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
500   EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
501       .WillOnce(Return(ByMove(std::move(grpc_reader))));
502 
503   auto rows = conn->ExecuteQuery(
504       {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
505        SqlStatement("select * from table")});
506   for (auto& row : rows) {
507     EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
508     EXPECT_THAT(row.status().message(),
509                 HasSubstr("uh-oh in GrpcReader::Finish"));
510   }
511 }
512 
TEST(ConnectionImplTest,ExecuteQueryReadSuccess)513 TEST(ConnectionImplTest, ExecuteQueryReadSuccess) {
514   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
515 
516   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
517   auto conn = MakeConnection(
518       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
519   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
520       .WillOnce(
521           [&db](grpc::ClientContext&,
522                 spanner_proto::BatchCreateSessionsRequest const& request) {
523             EXPECT_EQ(db.FullName(), request.database());
524             return MakeSessionsResponse({"test-session-name"});
525           });
526 
527   auto grpc_reader = absl::make_unique<MockGrpcReader>();
528   auto constexpr kText = R"pb(
529     metadata: {
530       row_type: {
531         fields: {
532           name: "UserId",
533           type: { code: INT64 }
534         }
535         fields: {
536           name: "UserName",
537           type: { code: STRING }
538         }
539       }
540     }
541     values: { string_value: "12" }
542     values: { string_value: "Steve" }
543     values: { string_value: "42" }
544     values: { string_value: "Ann" }
545   )pb";
546   spanner_proto::PartialResultSet response;
547   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
548   EXPECT_CALL(*grpc_reader, Read(_))
549       .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
550       .WillOnce(Return(false));
551   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
552   EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
553       .WillOnce(Return(ByMove(std::move(grpc_reader))));
554 
555   auto rows = conn->ExecuteQuery(
556       {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
557        SqlStatement("select * from table")});
558   using RowType = std::tuple<std::int64_t, std::string>;
559   auto expected = std::vector<RowType>{
560       RowType(12, "Steve"),
561       RowType(42, "Ann"),
562   };
563   int row_number = 0;
564   for (auto& row : StreamOf<RowType>(rows)) {
565     EXPECT_STATUS_OK(row);
566     EXPECT_EQ(*row, expected[row_number]);
567     ++row_number;
568   }
569   EXPECT_EQ(row_number, expected.size());
570 }
571 
572 /// @test Verify implicit "begin transaction" in ExecuteQuery() works.
TEST(ConnectionImplTest,ExecuteQueryImplicitBeginTransaction)573 TEST(ConnectionImplTest, ExecuteQueryImplicitBeginTransaction) {
574   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
575 
576   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
577   auto conn = MakeConnection(
578       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
579   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
580       .WillOnce(
581           [&db](grpc::ClientContext&,
582                 spanner_proto::BatchCreateSessionsRequest const& request) {
583             EXPECT_EQ(db.FullName(), request.database());
584             return MakeSessionsResponse({"test-session-name"});
585           });
586 
587   auto grpc_reader = absl::make_unique<MockGrpcReader>();
588   auto constexpr kText = R"pb(metadata: { transaction: { id: "00FEDCBA" } })pb";
589   spanner_proto::PartialResultSet response;
590   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
591   EXPECT_CALL(*grpc_reader, Read(_))
592       .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
593       .WillOnce(Return(false));
594   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
595   EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
596       .WillOnce(Return(ByMove(std::move(grpc_reader))));
597 
598   Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
599   auto rows = conn->ExecuteQuery({txn, SqlStatement("select * from table")});
600   for (auto& row : rows) {
601     EXPECT_STATUS_OK(row);
602   }
603   EXPECT_THAT(txn, HasSessionAndTransactionId("test-session-name", "00FEDCBA"));
604 }
605 
TEST(ConnectionImplTest,QueryOptions)606 TEST(ConnectionImplTest, QueryOptions) {
607   auto constexpr kQueryOptionsProp =
608       &spanner_proto::ExecuteSqlRequest::query_options;
609   std::vector<absl::optional<std::string>> const optimizer_versions = {
610       {}, "", "some-version"};
611 
612   for (auto const& version : optimizer_versions) {
613     spanner_proto::ExecuteSqlRequest::QueryOptions qo;
614     if (version) qo.set_optimizer_version(*version);
615     auto m = Property(kQueryOptionsProp, IsProtoEqual(qo));
616     auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
617     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
618         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
619 
620     auto txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
621     auto query_options = QueryOptions().set_optimizer_version(version);
622     auto params = Connection::SqlParams{txn, SqlStatement{}, query_options};
623     auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
624     auto conn = MakeConnection(
625         db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
626 
627     // We wrap MockGrpcReader in NiceMock, because we don't really care how
628     // it's called in this test (aside from needing to return a transaction in
629     // the first call), and we want to minimize GMock's "uninteresting mock
630     // function call" warnings.
631     using ::testing::NiceMock;
632     auto grpc_reader = absl::make_unique<NiceMock<MockGrpcReader>>();
633     auto constexpr kResponseText =
634         R"pb(metadata: { transaction: { id: "ABCDEF00" } })pb";
635     spanner_proto::PartialResultSet response;
636     ASSERT_TRUE(TextFormat::ParseFromString(kResponseText, &response));
637     EXPECT_CALL(*grpc_reader, Read(_))
638         .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)));
639 
640     // Calls the 5 Connection::* methods that take SqlParams and ensures that
641     // the protos being sent contain the expected options.
642     EXPECT_CALL(*mock, ExecuteStreamingSql(_, m))
643         .WillOnce(Return(ByMove(std::move(grpc_reader))))
644         .WillOnce(
645             Return(ByMove(absl::make_unique<NiceMock<MockGrpcReader>>())));
646     (void)conn->ExecuteQuery(params);
647     (void)conn->ProfileQuery(params);
648 
649     EXPECT_CALL(*mock, ExecuteSql(_, m)).Times(3);
650     (void)conn->ExecuteDml(params);
651     (void)conn->ProfileDml(params);
652     (void)conn->AnalyzeSql(params);
653   }
654 }
655 
TEST(ConnectionImplTest,ExecuteDmlGetSessionFailure)656 TEST(ConnectionImplTest, ExecuteDmlGetSessionFailure) {
657   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
658   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
659   auto conn = MakeConnection(
660       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
661   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
662       .WillOnce(
663           [&db](grpc::ClientContext&,
664                 spanner_proto::BatchCreateSessionsRequest const& request) {
665             EXPECT_EQ(db.FullName(), request.database());
666             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
667           });
668 
669   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
670   auto result = conn->ExecuteDml({txn, SqlStatement("delete * from table")});
671 
672   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
673   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in GetSession"));
674 }
675 
TEST(ConnectionImplTest,ExecuteDmlDeleteSuccess)676 TEST(ConnectionImplTest, ExecuteDmlDeleteSuccess) {
677   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
678   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
679   auto conn = MakeConnection(
680       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
681 
682   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
683       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
684 
685   auto constexpr kText = R"pb(
686     metadata: { transaction: { id: "1234567890" } }
687     stats: { row_count_exact: 42 }
688   )pb";
689   spanner_proto::ResultSet response;
690   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
691 
692   EXPECT_CALL(*mock, ExecuteSql(_, _))
693       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
694       .WillOnce(Return(response));
695   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
696   auto result = conn->ExecuteDml({txn, SqlStatement("delete * from table")});
697 
698   ASSERT_STATUS_OK(result);
699   EXPECT_EQ(result->RowsModified(), 42);
700 }
701 
TEST(ConnectionImplTest,ExecuteDmlDeletePermanentFailure)702 TEST(ConnectionImplTest, ExecuteDmlDeletePermanentFailure) {
703   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
704   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
705   auto conn = MakeConnection(
706       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
707 
708   {
709     InSequence seq;
710     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
711         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
712     Status status(StatusCode::kPermissionDenied, "uh-oh in ExecuteDml");
713     EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
714     EXPECT_CALL(*mock, BeginTransaction(_, _))
715         .WillOnce(Return(MakeTestTransaction()));
716     EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
717   }
718 
719   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
720   auto result = conn->ExecuteDml({txn, SqlStatement("delete * from table")});
721 
722   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
723   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in ExecuteDml"));
724 }
725 
TEST(ConnectionImplTest,ExecuteDmlDeleteTooManyTransientFailures)726 TEST(ConnectionImplTest, ExecuteDmlDeleteTooManyTransientFailures) {
727   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
728   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
729   auto conn = MakeLimitedRetryConnection(db, mock);
730 
731   {
732     InSequence seq;
733     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
734         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
735     Status status(StatusCode::kUnavailable, "try-again in ExecuteDml");
736     EXPECT_CALL(*mock, ExecuteSql(_, _))
737         .Times(AtLeast(2))
738         .WillRepeatedly(Return(status));
739     EXPECT_CALL(*mock, BeginTransaction(_, _))
740         .WillOnce(Return(MakeTestTransaction()));
741     EXPECT_CALL(*mock, ExecuteSql(_, _))
742         .Times(AtLeast(2))
743         .WillRepeatedly(Return(status));
744   }
745 
746   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
747   auto result = conn->ExecuteDml({txn, SqlStatement("delete * from table")});
748 
749   EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
750   EXPECT_THAT(result.status().message(), HasSubstr("try-again in ExecuteDml"));
751 }
752 
753 // Tests that a Transaction that fails to begin does not successfully commit.
754 // That would violate atomicity since the first DML statement did not execute.
TEST(ConnectionImplTest,ExecuteDmlTransactionAtomicity)755 TEST(ConnectionImplTest, ExecuteDmlTransactionAtomicity) {
756   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
757   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
758   auto conn = MakeConnection(
759       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
760 
761   Status op_status(StatusCode::kInvalidArgument, "ExecuteSql status");
762   Status begin_status(StatusCode::kInvalidArgument, "BeginTransaction status");
763   {
764     InSequence seq;
765     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
766         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
767 
768     // The first `ExecuteDml` call tries to implicitly begin the transaction
769     // via `ExecuteSql`, and then explicitly via `BeginTransaction`. Both fail,
770     // and we should receive no further RPC calls - since the transaction is
771     // not valid the client fails any subsequent operations itself.
772     EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(op_status));
773     EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(begin_status));
774   }
775 
776   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
777   // The first operation fails with the status of the operation's RPC
778   // (`ExecuteSql` in this case).
779   EXPECT_THAT(conn->ExecuteDml({txn, SqlStatement("some statement")}),
780               StatusIs(op_status.code(), HasSubstr(op_status.message())));
781   // Subsequent operations fail with the status of `BeginTransaction`.
782   EXPECT_THAT(conn->ExecuteDml({txn, SqlStatement("another statement")}),
783               StatusIs(begin_status.code(), HasSubstr(begin_status.message())));
784   EXPECT_THAT(conn->Commit({txn, {}}),
785               StatusIs(begin_status.code(), HasSubstr(begin_status.message())));
786 }
787 
TEST(ConnectionImplTest,ExecuteDmlTransactionMissing)788 TEST(ConnectionImplTest, ExecuteDmlTransactionMissing) {
789   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
790   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
791   auto conn = MakeConnection(
792       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
793 
794   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
795       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
796 
797   // Return an otherwise valid response that does not contain a transaction.
798   spanner_proto::ResultSet response;
799   ASSERT_TRUE(TextFormat::ParseFromString("metadata: {}", &response));
800   EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(response));
801 
802   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
803   EXPECT_THAT(
804       conn->ExecuteDml({txn, SqlStatement("select 1")}),
805       StatusIs(StatusCode::kInternal,
806                HasSubstr(
807                    "Begin transaction requested but no transaction returned")));
808 }
809 
TEST(ConnectionImplTest,ProfileQuerySuccess)810 TEST(ConnectionImplTest, ProfileQuerySuccess) {
811   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
812   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
813   auto conn = MakeConnection(
814       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
815   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
816       .WillOnce(
817           [&db](grpc::ClientContext&,
818                 spanner_proto::BatchCreateSessionsRequest const& request) {
819             EXPECT_EQ(db.FullName(), request.database());
820             return MakeSessionsResponse({"test-session-name"});
821           });
822 
823   auto grpc_reader = absl::make_unique<MockGrpcReader>();
824   auto constexpr kText = R"pb(
825     metadata: {
826       row_type: {
827         fields: {
828           name: "UserId",
829           type: { code: INT64 }
830         }
831         fields: {
832           name: "UserName",
833           type: { code: STRING }
834         }
835       }
836     }
837     values: { string_value: "12" }
838     values: { string_value: "Steve" }
839     values: { string_value: "42" }
840     values: { string_value: "Ann" }
841     stats: {
842       query_plan { plan_nodes: { index: 42 } }
843       query_stats {
844         fields {
845           key: "elapsed_time"
846           value { string_value: "42 secs" }
847         }
848       }
849     }
850   )pb";
851   spanner_proto::PartialResultSet response;
852   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
853   EXPECT_CALL(*grpc_reader, Read(_))
854       .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
855       .WillOnce(Return(false));
856   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
857   EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
858       .WillOnce(Return(ByMove(std::move(grpc_reader))));
859 
860   auto result = conn->ProfileQuery(
861       {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
862        SqlStatement("select * from table")});
863   using RowType = std::tuple<std::int64_t, std::string>;
864   auto expected = std::vector<RowType>{
865       RowType(12, "Steve"),
866       RowType(42, "Ann"),
867   };
868   int row_number = 0;
869   for (auto& row : StreamOf<RowType>(result)) {
870     EXPECT_STATUS_OK(row);
871     EXPECT_EQ(*row, expected[row_number]);
872     ++row_number;
873   }
874   EXPECT_EQ(row_number, expected.size());
875 
876   auto constexpr kTextExpectedPlan = R"pb(
877     plan_nodes: { index: 42 }
878   )pb";
879   google::spanner::v1::QueryPlan expected_plan;
880   ASSERT_TRUE(TextFormat::ParseFromString(kTextExpectedPlan, &expected_plan));
881 
882   auto plan = result.ExecutionPlan();
883   ASSERT_TRUE(plan);
884   EXPECT_THAT(*plan, IsProtoEqual(expected_plan));
885 
886   std::vector<std::pair<const std::string, std::string>> expected_stats;
887   expected_stats.emplace_back("elapsed_time", "42 secs");
888   auto execution_stats = result.ExecutionStats();
889   ASSERT_TRUE(execution_stats);
890   EXPECT_THAT(*execution_stats, UnorderedPointwise(Eq(), expected_stats));
891 }
892 
TEST(ConnectionImplTest,ProfileQueryGetSessionFailure)893 TEST(ConnectionImplTest, ProfileQueryGetSessionFailure) {
894   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
895   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
896   auto conn = MakeConnection(
897       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
898   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
899       .WillOnce(
900           [&db](grpc::ClientContext&,
901                 spanner_proto::BatchCreateSessionsRequest const& request) {
902             EXPECT_EQ(db.FullName(), request.database());
903             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
904           });
905 
906   auto result = conn->ProfileQuery(
907       {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
908        SqlStatement("select * from table")});
909   for (auto& row : result) {
910     EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
911     EXPECT_THAT(row.status().message(), HasSubstr("uh-oh in GetSession"));
912   }
913 }
914 
TEST(ConnectionImplTest,ProfileQueryStreamingReadFailure)915 TEST(ConnectionImplTest, ProfileQueryStreamingReadFailure) {
916   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
917 
918   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
919   auto conn = MakeConnection(
920       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
921   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
922       .WillOnce(
923           [&db](grpc::ClientContext&,
924                 spanner_proto::BatchCreateSessionsRequest const& request) {
925             EXPECT_EQ(db.FullName(), request.database());
926             return MakeSessionsResponse({"test-session-name"});
927           });
928 
929   auto grpc_reader = absl::make_unique<MockGrpcReader>();
930   EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
931   grpc::Status finish_status(grpc::StatusCode::PERMISSION_DENIED,
932                              "uh-oh in GrpcReader::Finish");
933   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
934   EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
935       .WillOnce(Return(ByMove(std::move(grpc_reader))));
936 
937   auto result = conn->ProfileQuery(
938       {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
939        SqlStatement("select * from table")});
940   for (auto& row : result) {
941     EXPECT_EQ(StatusCode::kPermissionDenied, row.status().code());
942     EXPECT_THAT(row.status().message(),
943                 HasSubstr("uh-oh in GrpcReader::Finish"));
944   }
945 }
946 
TEST(ConnectionImplTest,ProfileDmlGetSessionFailure)947 TEST(ConnectionImplTest, ProfileDmlGetSessionFailure) {
948   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
949   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
950   auto conn = MakeConnection(
951       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
952   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
953       .WillOnce(
954           [&db](grpc::ClientContext&,
955                 spanner_proto::BatchCreateSessionsRequest const& request) {
956             EXPECT_EQ(db.FullName(), request.database());
957             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
958           });
959 
960   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
961   auto result = conn->ProfileDml({txn, SqlStatement("delete * from table")});
962 
963   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
964   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in GetSession"));
965 }
966 
TEST(ConnectionImplTest,ProfileDmlDeleteSuccess)967 TEST(ConnectionImplTest, ProfileDmlDeleteSuccess) {
968   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
969   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
970   auto conn = MakeConnection(
971       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
972 
973   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
974       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
975 
976   auto constexpr kText = R"pb(
977     metadata: { transaction: { id: "1234567890" } }
978     stats: {
979       row_count_exact: 42
980       query_plan { plan_nodes: { index: 42 } }
981       query_stats {
982         fields {
983           key: "elapsed_time"
984           value { string_value: "42 secs" }
985         }
986       }
987     }
988   )pb";
989   spanner_proto::ResultSet response;
990   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
991 
992   EXPECT_CALL(*mock, ExecuteSql(_, _))
993       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
994       .WillOnce(Return(response));
995   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
996   auto result = conn->ProfileDml({txn, SqlStatement("delete * from table")});
997 
998   ASSERT_STATUS_OK(result);
999   EXPECT_EQ(result->RowsModified(), 42);
1000 
1001   auto constexpr kTextExpectedPlan = R"pb(
1002     plan_nodes: { index: 42 }
1003   )pb";
1004   google::spanner::v1::QueryPlan expected_plan;
1005   ASSERT_TRUE(TextFormat::ParseFromString(kTextExpectedPlan, &expected_plan));
1006 
1007   auto plan = result->ExecutionPlan();
1008   ASSERT_TRUE(plan);
1009   EXPECT_THAT(*plan, IsProtoEqual(expected_plan));
1010 
1011   std::vector<std::pair<const std::string, std::string>> expected_stats;
1012   expected_stats.emplace_back("elapsed_time", "42 secs");
1013   auto execution_stats = result->ExecutionStats();
1014   ASSERT_TRUE(execution_stats);
1015   EXPECT_THAT(*execution_stats, UnorderedPointwise(Eq(), expected_stats));
1016 }
1017 
TEST(ConnectionImplTest,ProfileDmlDeletePermanentFailure)1018 TEST(ConnectionImplTest, ProfileDmlDeletePermanentFailure) {
1019   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1020   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1021   auto conn = MakeConnection(
1022       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1023 
1024   {
1025     InSequence seq;
1026     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1027         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1028     Status status(StatusCode::kPermissionDenied, "uh-oh in ExecuteDml");
1029     EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
1030     EXPECT_CALL(*mock, BeginTransaction(_, _))
1031         .WillOnce(Return(MakeTestTransaction()));
1032     EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
1033   }
1034 
1035   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1036   auto result = conn->ProfileDml({txn, SqlStatement("delete * from table")});
1037 
1038   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1039   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in ExecuteDml"));
1040 }
1041 
TEST(ConnectionImplTest,ProfileDmlDeleteTooManyTransientFailures)1042 TEST(ConnectionImplTest, ProfileDmlDeleteTooManyTransientFailures) {
1043   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1044   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1045   auto conn = MakeLimitedRetryConnection(db, mock);
1046 
1047   {
1048     InSequence seq;
1049     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1050         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1051     Status status(StatusCode::kUnavailable, "try-again in ExecuteDml");
1052     EXPECT_CALL(*mock, ExecuteSql(_, _))
1053         .Times(AtLeast(2))
1054         .WillRepeatedly(Return(status));
1055     EXPECT_CALL(*mock, BeginTransaction(_, _))
1056         .WillOnce(Return(MakeTestTransaction()));
1057     EXPECT_CALL(*mock, ExecuteSql(_, _))
1058         .Times(AtLeast(2))
1059         .WillRepeatedly(Return(status));
1060   }
1061 
1062   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1063   auto result = conn->ProfileDml({txn, SqlStatement("delete * from table")});
1064 
1065   EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1066   EXPECT_THAT(result.status().message(), HasSubstr("try-again in ExecuteDml"));
1067 }
1068 
TEST(ConnectionImplTest,AnalyzeSqlSuccess)1069 TEST(ConnectionImplTest, AnalyzeSqlSuccess) {
1070   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1071 
1072   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1073   auto conn = MakeConnection(
1074       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1075   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1076       .WillOnce(
1077           [&db](grpc::ClientContext&,
1078                 spanner_proto::BatchCreateSessionsRequest const& request) {
1079             EXPECT_EQ(db.FullName(), request.database());
1080             return MakeSessionsResponse({"test-session-name"});
1081           });
1082 
1083   auto grpc_reader = absl::make_unique<MockGrpcReader>();
1084   auto constexpr kText = R"pb(
1085     metadata: {}
1086     stats: { query_plan { plan_nodes: { index: 42 } } }
1087   )pb";
1088   spanner_proto::ResultSet response;
1089   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
1090   EXPECT_CALL(*mock, ExecuteSql(_, _))
1091       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1092       .WillOnce(Return(ByMove(std::move(response))));
1093 
1094   auto result = conn->AnalyzeSql(
1095       {MakeSingleUseTransaction(Transaction::ReadOnlyOptions()),
1096        SqlStatement("select * from table")});
1097 
1098   auto constexpr kTextExpectedPlan = R"pb(
1099     plan_nodes: { index: 42 }
1100   )pb";
1101   google::spanner::v1::QueryPlan expected_plan;
1102   ASSERT_TRUE(TextFormat::ParseFromString(kTextExpectedPlan, &expected_plan));
1103 
1104   ASSERT_STATUS_OK(result);
1105   EXPECT_THAT(*result, IsProtoEqual(expected_plan));
1106 }
1107 
TEST(ConnectionImplTest,AnalyzeSqlGetSessionFailure)1108 TEST(ConnectionImplTest, AnalyzeSqlGetSessionFailure) {
1109   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1110   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1111   auto conn = MakeConnection(
1112       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1113   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1114       .WillOnce(
1115           [&db](grpc::ClientContext&,
1116                 spanner_proto::BatchCreateSessionsRequest const& request) {
1117             EXPECT_EQ(db.FullName(), request.database());
1118             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
1119           });
1120 
1121   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1122   auto result = conn->AnalyzeSql({txn, SqlStatement("delete * from table")});
1123 
1124   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1125   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in GetSession"));
1126 }
1127 
TEST(ConnectionImplTest,AnalyzeSqlDeletePermanentFailure)1128 TEST(ConnectionImplTest, AnalyzeSqlDeletePermanentFailure) {
1129   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1130   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1131   auto conn = MakeConnection(
1132       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1133 
1134   {
1135     InSequence seq;
1136     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1137         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1138     Status status(StatusCode::kPermissionDenied, "uh-oh in ExecuteDml");
1139     EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
1140     EXPECT_CALL(*mock, BeginTransaction(_, _))
1141         .WillOnce(Return(MakeTestTransaction()));
1142     EXPECT_CALL(*mock, ExecuteSql(_, _)).WillOnce(Return(status));
1143   }
1144 
1145   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1146   auto result = conn->AnalyzeSql({txn, SqlStatement("delete * from table")});
1147 
1148   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1149   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in ExecuteDml"));
1150 }
1151 
TEST(ConnectionImplTest,AnalyzeSqlDeleteTooManyTransientFailures)1152 TEST(ConnectionImplTest, AnalyzeSqlDeleteTooManyTransientFailures) {
1153   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1154   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1155   auto conn = MakeLimitedRetryConnection(db, mock);
1156 
1157   {
1158     InSequence seq;
1159     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1160         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1161     Status status(StatusCode::kUnavailable, "try-again in ExecuteDml");
1162     EXPECT_CALL(*mock, ExecuteSql(_, _))
1163         .Times(AtLeast(2))
1164         .WillRepeatedly(Return(status));
1165     EXPECT_CALL(*mock, BeginTransaction(_, _))
1166         .WillOnce(Return(MakeTestTransaction()));
1167     EXPECT_CALL(*mock, ExecuteSql(_, _))
1168         .Times(AtLeast(2))
1169         .WillRepeatedly(Return(status));
1170   }
1171 
1172   Transaction txn = MakeReadWriteTransaction(Transaction::ReadWriteOptions());
1173   auto result = conn->AnalyzeSql({txn, SqlStatement("delete * from table")});
1174 
1175   EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1176   EXPECT_THAT(result.status().message(), HasSubstr("try-again in ExecuteDml"));
1177 }
1178 
TEST(ConnectionImplTest,ExecuteBatchDmlSuccess)1179 TEST(ConnectionImplTest, ExecuteBatchDmlSuccess) {
1180   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1181   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1182 
1183   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1184       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1185 
1186   auto constexpr kText = R"pb(
1187     result_sets: {
1188       metadata: { transaction: { id: "1234567890" } }
1189       stats: { row_count_exact: 0 }
1190     }
1191     result_sets: { stats: { row_count_exact: 1 } }
1192     result_sets: { stats: { row_count_exact: 2 } }
1193   )pb";
1194   spanner_proto::ExecuteBatchDmlResponse response;
1195   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
1196   EXPECT_CALL(*mock, ExecuteBatchDml(_, _))
1197       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1198       .WillOnce(Return(response));
1199 
1200   auto request = {
1201       SqlStatement("update ..."),
1202       SqlStatement("update ..."),
1203       SqlStatement("update ..."),
1204   };
1205 
1206   auto conn = MakeConnection(
1207       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1208   auto txn = MakeReadWriteTransaction();
1209   auto result = conn->ExecuteBatchDml({txn, request});
1210   EXPECT_STATUS_OK(result);
1211   EXPECT_STATUS_OK(result->status);
1212   EXPECT_EQ(result->stats.size(), request.size());
1213   EXPECT_EQ(result->stats.size(), 3);
1214   EXPECT_EQ(result->stats[0].row_count, 0);
1215   EXPECT_EQ(result->stats[1].row_count, 1);
1216   EXPECT_EQ(result->stats[2].row_count, 2);
1217   EXPECT_THAT(txn, HasSessionAndTransactionId("session-name", "1234567890"));
1218 }
1219 
TEST(ConnectionImplTest,ExecuteBatchDmlPartialFailure)1220 TEST(ConnectionImplTest, ExecuteBatchDmlPartialFailure) {
1221   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1222   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1223 
1224   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1225       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1226 
1227   auto constexpr kText = R"pb(
1228     result_sets: {
1229       metadata: { transaction: { id: "1234567890" } }
1230       stats: { row_count_exact: 42 }
1231     }
1232     result_sets: { stats: { row_count_exact: 43 } }
1233     status: { code: 2 message: "oops" }
1234   )pb";
1235   spanner_proto::ExecuteBatchDmlResponse response;
1236   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
1237   EXPECT_CALL(*mock, ExecuteBatchDml(_, _)).WillOnce(Return(response));
1238 
1239   auto request = {
1240       SqlStatement("update ..."),
1241       SqlStatement("update ..."),
1242       SqlStatement("update ..."),
1243   };
1244 
1245   auto conn = MakeConnection(
1246       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1247   auto txn = MakeReadWriteTransaction();
1248   auto result = conn->ExecuteBatchDml({txn, request});
1249   EXPECT_STATUS_OK(result);
1250   EXPECT_EQ(result->status.code(), StatusCode::kUnknown);
1251   EXPECT_EQ(result->status.message(), "oops");
1252   EXPECT_NE(result->stats.size(), request.size());
1253   EXPECT_EQ(result->stats.size(), 2);
1254   EXPECT_EQ(result->stats[0].row_count, 42);
1255   EXPECT_EQ(result->stats[1].row_count, 43);
1256   EXPECT_THAT(txn, HasSessionAndTransactionId("session-name", "1234567890"));
1257 }
1258 
TEST(ConnectionImplTest,ExecuteBatchDmlPermanentFailure)1259 TEST(ConnectionImplTest, ExecuteBatchDmlPermanentFailure) {
1260   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1261   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1262 
1263   {
1264     InSequence seq;
1265     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1266         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1267 
1268     Status status(StatusCode::kPermissionDenied, "uh-oh in ExecuteBatchDml");
1269     EXPECT_CALL(*mock, ExecuteBatchDml(_, _)).WillOnce(Return(status));
1270     EXPECT_CALL(*mock, BeginTransaction(_, _))
1271         .WillOnce(Return(MakeTestTransaction()));
1272     EXPECT_CALL(*mock, ExecuteBatchDml(_, _)).WillOnce(Return(status));
1273   }
1274 
1275   auto request = {
1276       SqlStatement("update ..."),
1277       SqlStatement("update ..."),
1278       SqlStatement("update ..."),
1279   };
1280 
1281   auto conn = MakeLimitedRetryConnection(db, mock);
1282   auto txn = MakeReadWriteTransaction();
1283   auto result = conn->ExecuteBatchDml({txn, request});
1284   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1285   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in ExecuteBatchDml"));
1286 }
1287 
TEST(ConnectionImplTest,ExecuteBatchDmlTooManyTransientFailures)1288 TEST(ConnectionImplTest, ExecuteBatchDmlTooManyTransientFailures) {
1289   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1290   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1291 
1292   {
1293     InSequence seq;
1294     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1295         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1296 
1297     Status status(StatusCode::kUnavailable, "try-again in ExecuteBatchDml");
1298     EXPECT_CALL(*mock, ExecuteBatchDml(_, _))
1299         .Times(AtLeast(2))
1300         .WillRepeatedly(Return(status));
1301     EXPECT_CALL(*mock, BeginTransaction(_, _))
1302         .WillOnce(Return(MakeTestTransaction()));
1303     EXPECT_CALL(*mock, ExecuteBatchDml(_, _))
1304         .Times(AtLeast(2))
1305         .WillRepeatedly(Return(status));
1306   }
1307 
1308   auto request = {
1309       SqlStatement("update ..."),
1310       SqlStatement("update ..."),
1311       SqlStatement("update ..."),
1312   };
1313 
1314   auto conn = MakeLimitedRetryConnection(db, mock);
1315   auto txn = MakeReadWriteTransaction();
1316   auto result = conn->ExecuteBatchDml({txn, request});
1317   EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1318   EXPECT_THAT(result.status().message(),
1319               HasSubstr("try-again in ExecuteBatchDml"));
1320 }
1321 
TEST(ConnectionImplTest,ExecuteBatchDmlNoResultSets)1322 TEST(ConnectionImplTest, ExecuteBatchDmlNoResultSets) {
1323   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1324   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1325 
1326   {
1327     InSequence seq;
1328     EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1329         .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1330     // The `ExecuteBatchDml` call can succeed, but with no `ResultSet`s and an
1331     // error status in the response.
1332     auto constexpr kText =
1333         R"pb(status: { code: 6 message: "failed to insert ..." })pb";
1334     spanner_proto::ExecuteBatchDmlResponse response;
1335     ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
1336     EXPECT_CALL(
1337         *mock, ExecuteBatchDml(
1338                    _, ReadRequestHasSessionAndBeginTransaction("session-name")))
1339         .WillOnce(Return(response));
1340     EXPECT_CALL(*mock, BeginTransaction(_, _))
1341         .WillOnce(Return(MakeTestTransaction("BD000001")));
1342     EXPECT_CALL(*mock, ExecuteBatchDml(_, ReadRequestHasSessionAndTransactionId(
1343                                               "session-name", "BD000001")))
1344         .WillOnce(Return(response));
1345   }
1346 
1347   auto request = {SqlStatement("update ...")};
1348   auto conn = MakeConnection(
1349       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1350   auto txn = MakeReadWriteTransaction();
1351   auto result = conn->ExecuteBatchDml({txn, request});
1352   EXPECT_STATUS_OK(result);
1353   EXPECT_THAT(result->status, StatusIs(StatusCode::kAlreadyExists,
1354                                        HasSubstr("failed to insert ...")));
1355 }
1356 
TEST(ConnectionImplTest,ExecutePartitionedDmlDeleteSuccess)1357 TEST(ConnectionImplTest, ExecutePartitionedDmlDeleteSuccess) {
1358   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1359   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1360   auto conn = MakeConnection(
1361       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1362 
1363   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1364       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1365 
1366   auto constexpr kTextTxn = R"pb(
1367     id: "1234567890"
1368   )pb";
1369   spanner_proto::Transaction txn;
1370   ASSERT_TRUE(TextFormat::ParseFromString(kTextTxn, &txn));
1371   EXPECT_CALL(*mock, BeginTransaction(_, _))
1372       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1373       .WillOnce(Return(txn));
1374 
1375   auto constexpr kTextResponse = R"pb(
1376     metadata: { transaction: { id: "1234567890" } }
1377     stats: { row_count_lower_bound: 42 }
1378   )pb";
1379   spanner_proto::ResultSet response;
1380   ASSERT_TRUE(TextFormat::ParseFromString(kTextResponse, &response));
1381 
1382   EXPECT_CALL(*mock, ExecuteSql(_, _))
1383       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1384       .WillOnce(Return(response));
1385   auto result =
1386       conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1387 
1388   ASSERT_STATUS_OK(result);
1389   EXPECT_EQ(result->row_count_lower_bound, 42);
1390 }
1391 
TEST(ConnectionImplTest,ExecutePartitionedDmlGetSessionFailure)1392 TEST(ConnectionImplTest, ExecutePartitionedDmlGetSessionFailure) {
1393   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1394   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1395   auto conn = MakeConnection(
1396       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1397   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1398       .WillOnce(
1399           [&db](grpc::ClientContext&,
1400                 spanner_proto::BatchCreateSessionsRequest const& request) {
1401             EXPECT_EQ(db.FullName(), request.database());
1402             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
1403           });
1404 
1405   auto result =
1406       conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1407 
1408   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1409   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh in GetSession"));
1410 }
1411 
TEST(ConnectionImplTest,ExecutePartitionedDmlDeletePermanentFailure)1412 TEST(ConnectionImplTest, ExecutePartitionedDmlDeletePermanentFailure) {
1413   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1414   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1415   auto conn = MakeConnection(
1416       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1417 
1418   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1419       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1420 
1421   auto constexpr kTextTxn = R"pb(
1422     id: "1234567890"
1423   )pb";
1424   spanner_proto::Transaction txn;
1425   ASSERT_TRUE(TextFormat::ParseFromString(kTextTxn, &txn));
1426   EXPECT_CALL(*mock, BeginTransaction(_, _))
1427       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1428       .WillOnce(Return(txn));
1429 
1430   EXPECT_CALL(*mock, ExecuteSql(_, _))
1431       .WillOnce(Return(Status(StatusCode::kPermissionDenied,
1432                               "uh-oh in ExecutePartitionedDml")));
1433 
1434   auto result =
1435       conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1436 
1437   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1438   EXPECT_THAT(result.status().message(),
1439               HasSubstr("uh-oh in ExecutePartitionedDml"));
1440 }
1441 
TEST(ConnectionImplTest,ExecutePartitionedDmlDeleteTooManyTransientFailures)1442 TEST(ConnectionImplTest, ExecutePartitionedDmlDeleteTooManyTransientFailures) {
1443   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1444   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1445   auto conn = MakeLimitedRetryConnection(db, mock);
1446 
1447   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1448       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1449 
1450   auto constexpr kTextTxn = R"pb(
1451     id: "1234567890"
1452   )pb";
1453   spanner_proto::Transaction txn;
1454   ASSERT_TRUE(TextFormat::ParseFromString(kTextTxn, &txn));
1455   EXPECT_CALL(*mock, BeginTransaction(_, _))
1456       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1457       .WillOnce(Return(txn));
1458 
1459   EXPECT_CALL(*mock, ExecuteSql(_, _))
1460       .Times(AtLeast(2))
1461       .WillRepeatedly(Return(Status(StatusCode::kUnavailable,
1462                                     "try-again in ExecutePartitionedDml")));
1463 
1464   auto result =
1465       conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1466 
1467   EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1468   EXPECT_THAT(result.status().message(),
1469               HasSubstr("try-again in ExecutePartitionedDml"));
1470 }
1471 
TEST(ConnectionImplTest,ExecutePartitionedDmlDeleteBeginTransactionPermanentFailure)1472 TEST(ConnectionImplTest,
1473      ExecutePartitionedDmlDeleteBeginTransactionPermanentFailure) {
1474   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1475   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1476   auto conn = MakeConnection(
1477       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1478 
1479   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1480       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1481 
1482   EXPECT_CALL(*mock, BeginTransaction(_, _))
1483       .WillOnce(Return(Status(StatusCode::kPermissionDenied,
1484                               "uh-oh in ExecutePartitionedDml")));
1485 
1486   auto result =
1487       conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1488 
1489   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
1490   EXPECT_THAT(result.status().message(),
1491               HasSubstr("uh-oh in ExecutePartitionedDml"));
1492 }
1493 
TEST(ConnectionImplTest,ExecutePartitionedDmlDeleteBeginTransactionTooManyTransientFailures)1494 TEST(ConnectionImplTest,
1495      ExecutePartitionedDmlDeleteBeginTransactionTooManyTransientFailures) {
1496   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1497   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1498   auto conn = MakeLimitedRetryConnection(db, mock);
1499 
1500   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1501       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1502 
1503   EXPECT_CALL(*mock, BeginTransaction(_, _))
1504       .Times(AtLeast(2))
1505       .WillRepeatedly(Return(Status(StatusCode::kUnavailable,
1506                                     "try-again in ExecutePartitionedDml")));
1507 
1508   auto result =
1509       conn->ExecutePartitionedDml({SqlStatement("delete * from table")});
1510 
1511   EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
1512   EXPECT_THAT(result.status().message(),
1513               HasSubstr("try-again in ExecutePartitionedDml"));
1514 }
1515 
TEST(ConnectionImplTest,CommitGetSessionPermanentFailure)1516 TEST(ConnectionImplTest, CommitGetSessionPermanentFailure) {
1517   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1518 
1519   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1520   auto conn = MakeLimitedRetryConnection(db, mock);
1521   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1522       .WillOnce(
1523           [&db](grpc::ClientContext&,
1524                 spanner_proto::BatchCreateSessionsRequest const& request) {
1525             EXPECT_EQ(db.FullName(), request.database());
1526             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
1527           });
1528 
1529   auto commit = conn->Commit({MakeReadWriteTransaction()});
1530   EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
1531   EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in GetSession"));
1532 }
1533 
TEST(ConnectionImplTest,CommitGetSessionTooManyTransientFailures)1534 TEST(ConnectionImplTest, CommitGetSessionTooManyTransientFailures) {
1535   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1536 
1537   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1538   auto conn = MakeLimitedRetryConnection(db, mock);
1539   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1540       .Times(AtLeast(2))
1541       .WillRepeatedly(
1542           [&db](grpc::ClientContext&,
1543                 spanner_proto::BatchCreateSessionsRequest const& request) {
1544             EXPECT_EQ(db.FullName(), request.database());
1545             return Status(StatusCode::kUnavailable, "try-again in GetSession");
1546           });
1547 
1548   auto commit = conn->Commit({MakeReadWriteTransaction()});
1549   EXPECT_EQ(StatusCode::kUnavailable, commit.status().code());
1550   EXPECT_THAT(commit.status().message(), HasSubstr("try-again in GetSession"));
1551 }
1552 
TEST(ConnectionImplTest,CommitGetSessionRetry)1553 TEST(ConnectionImplTest, CommitGetSessionRetry) {
1554   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1555 
1556   spanner_proto::Transaction txn = MakeTestTransaction();
1557   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1558   auto conn = MakeLimitedRetryConnection(db, mock);
1559   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1560       .WillOnce(
1561           [&db](grpc::ClientContext&,
1562                 spanner_proto::BatchCreateSessionsRequest const& request) {
1563             EXPECT_EQ(db.FullName(), request.database());
1564             return Status(StatusCode::kUnavailable, "try-again in GetSession");
1565           })
1566       .WillOnce(
1567           [&db](grpc::ClientContext&,
1568                 spanner_proto::BatchCreateSessionsRequest const& request) {
1569             EXPECT_EQ(db.FullName(), request.database());
1570             return MakeSessionsResponse({"test-session-name"});
1571           });
1572   EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
1573   EXPECT_CALL(*mock, Commit(_, _))
1574       .WillOnce([&txn](grpc::ClientContext&,
1575                        spanner_proto::CommitRequest const& request) {
1576         EXPECT_EQ("test-session-name", request.session());
1577         EXPECT_EQ(txn.id(), request.transaction_id());
1578         return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
1579       });
1580   auto commit = conn->Commit({MakeReadWriteTransaction()});
1581   EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
1582   EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in Commit"));
1583 }
1584 
TEST(ConnectionImplTest,CommitBeginTransactionRetry)1585 TEST(ConnectionImplTest, CommitBeginTransactionRetry) {
1586   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1587 
1588   spanner_proto::Transaction txn = MakeTestTransaction();
1589   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1590   auto conn = MakeLimitedRetryConnection(db, mock);
1591   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1592       .WillOnce(
1593           [&db](grpc::ClientContext&,
1594                 spanner_proto::BatchCreateSessionsRequest const& request) {
1595             EXPECT_EQ(db.FullName(), request.database());
1596             return MakeSessionsResponse({"test-session-name"});
1597           });
1598   EXPECT_CALL(*mock, BeginTransaction(_, _))
1599       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1600       .WillOnce(Return(txn));
1601   auto const commit_timestamp =
1602       MakeTimestamp(std::chrono::system_clock::from_time_t(123)).value();
1603   EXPECT_CALL(*mock, Commit(_, _))
1604       .WillOnce([&txn, commit_timestamp](
1605                     grpc::ClientContext&,
1606                     spanner_proto::CommitRequest const& request) {
1607         EXPECT_EQ("test-session-name", request.session());
1608         EXPECT_EQ(txn.id(), request.transaction_id());
1609         spanner_proto::CommitResponse response;
1610         *response.mutable_commit_timestamp() =
1611             internal::TimestampToProto(commit_timestamp);
1612         return response;
1613       });
1614 
1615   auto commit = conn->Commit({MakeReadWriteTransaction()});
1616   EXPECT_STATUS_OK(commit);
1617   EXPECT_EQ(commit_timestamp, commit->commit_timestamp);
1618 }
1619 
TEST(ConnectionImplTest,CommitBeginTransactionSessionNotFound)1620 TEST(ConnectionImplTest, CommitBeginTransactionSessionNotFound) {
1621   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1622   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1623   auto conn = MakeLimitedRetryConnection(db, mock);
1624   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1625       .WillOnce(
1626           [&db](grpc::ClientContext&,
1627                 spanner_proto::BatchCreateSessionsRequest const& request) {
1628             EXPECT_EQ(db.FullName(), request.database());
1629             return MakeSessionsResponse({"test-session-name"});
1630           });
1631   EXPECT_CALL(*mock, BeginTransaction(_, _))
1632       .WillOnce(Return(Status(StatusCode::kNotFound, "Session not found")));
1633   auto txn = MakeReadWriteTransaction();
1634   auto commit = conn->Commit({txn});
1635   EXPECT_FALSE(commit.ok());
1636   auto status = commit.status();
1637   EXPECT_TRUE(IsSessionNotFound(status)) << status;
1638   EXPECT_THAT(txn, HasBadSession());
1639 }
1640 
TEST(ConnectionImplTest,CommitBeginTransactionPermanentFailure)1641 TEST(ConnectionImplTest, CommitBeginTransactionPermanentFailure) {
1642   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1643   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1644   auto conn = MakeLimitedRetryConnection(db, mock);
1645   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1646       .WillOnce(Return(MakeSessionsResponse({"session-name"})));
1647   EXPECT_CALL(*mock, BeginTransaction(_, _))
1648       .WillOnce(Return(
1649           Status(StatusCode::kInvalidArgument, "BeginTransaction failed")));
1650   auto txn = MakeReadWriteTransaction();
1651   EXPECT_THAT(conn->Commit({txn}),
1652               StatusIs(StatusCode::kInvalidArgument,
1653                        HasSubstr("BeginTransaction failed")));
1654 
1655   // Retrying the operation should also fail with the same error, without making
1656   // an additional `BeginTransaction` call.
1657   EXPECT_THAT(conn->Commit({txn}),
1658               StatusIs(StatusCode::kInvalidArgument,
1659                        HasSubstr("BeginTransaction failed")));
1660 }
1661 
TEST(ConnectionImplTest,CommitCommitPermanentFailure)1662 TEST(ConnectionImplTest, CommitCommitPermanentFailure) {
1663   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1664 
1665   spanner_proto::Transaction txn = MakeTestTransaction();
1666   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1667   auto conn = MakeLimitedRetryConnection(db, mock);
1668   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1669       .WillOnce(
1670           [&db](grpc::ClientContext&,
1671                 spanner_proto::BatchCreateSessionsRequest const& request) {
1672             EXPECT_EQ(db.FullName(), request.database());
1673             return MakeSessionsResponse({"test-session-name"});
1674           });
1675   EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
1676   EXPECT_CALL(*mock, Commit(_, _))
1677       .WillOnce([&txn](grpc::ClientContext&,
1678                        spanner_proto::CommitRequest const& request) {
1679         EXPECT_EQ("test-session-name", request.session());
1680         EXPECT_EQ(txn.id(), request.transaction_id());
1681         return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
1682       });
1683   auto commit = conn->Commit({MakeReadWriteTransaction()});
1684   EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
1685   EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in Commit"));
1686 }
1687 
TEST(ConnectionImplTest,CommitCommitTooManyTransientFailures)1688 TEST(ConnectionImplTest, CommitCommitTooManyTransientFailures) {
1689   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1690 
1691   spanner_proto::Transaction txn = MakeTestTransaction();
1692   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1693   auto conn = MakeLimitedRetryConnection(db, mock);
1694   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1695       .WillOnce(
1696           [&db](grpc::ClientContext&,
1697                 spanner_proto::BatchCreateSessionsRequest const& request) {
1698             EXPECT_EQ(db.FullName(), request.database());
1699             return MakeSessionsResponse({"test-session-name"});
1700           });
1701   EXPECT_CALL(*mock, BeginTransaction(_, _)).WillOnce(Return(txn));
1702   EXPECT_CALL(*mock, Commit(_, _))
1703       .WillOnce([&txn](grpc::ClientContext&,
1704                        spanner_proto::CommitRequest const& request) {
1705         EXPECT_EQ("test-session-name", request.session());
1706         EXPECT_EQ(txn.id(), request.transaction_id());
1707         return Status(StatusCode::kPermissionDenied, "uh-oh in Commit");
1708       });
1709   auto commit = conn->Commit({MakeReadWriteTransaction()});
1710   EXPECT_EQ(StatusCode::kPermissionDenied, commit.status().code());
1711   EXPECT_THAT(commit.status().message(), HasSubstr("uh-oh in Commit"));
1712 }
1713 
TEST(ConnectionImplTest,CommitCommitInvalidatedTransaction)1714 TEST(ConnectionImplTest, CommitCommitInvalidatedTransaction) {
1715   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1716 
1717   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1718   auto conn = MakeConnection(
1719       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1720   EXPECT_CALL(*mock, BatchCreateSessions(_, _)).Times(0);
1721   EXPECT_CALL(*mock, BeginTransaction(_, _)).Times(0);
1722   EXPECT_CALL(*mock, Commit(_, _)).Times(0);
1723 
1724   // Committing an invalidated transaction is a unilateral error.
1725   auto txn = MakeReadWriteTransaction();
1726   SetTransactionInvalid(txn,
1727                         Status(StatusCode::kAlreadyExists, "constraint error"));
1728 
1729   auto commit = conn->Commit({txn});
1730   EXPECT_FALSE(commit.ok());
1731   auto status = commit.status();
1732   EXPECT_EQ(StatusCode::kAlreadyExists, commit.status().code());
1733   EXPECT_THAT(commit.status().message(), HasSubstr("constraint error"));
1734 }
1735 
TEST(ConnectionImplTest,CommitCommitIdempotentTransientSuccess)1736 TEST(ConnectionImplTest, CommitCommitIdempotentTransientSuccess) {
1737   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1738 
1739   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1740   auto conn = MakeConnection(
1741       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1742   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1743       .WillOnce(
1744           [&db](grpc::ClientContext&,
1745                 spanner_proto::BatchCreateSessionsRequest const& request) {
1746             EXPECT_EQ(db.FullName(), request.database());
1747             return MakeSessionsResponse({"test-session-name"});
1748           });
1749   auto const commit_timestamp =
1750       MakeTimestamp(std::chrono::system_clock::from_time_t(123)).value();
1751   EXPECT_CALL(*mock, Commit(_, _))
1752       .WillOnce([](grpc::ClientContext&,
1753                    spanner_proto::CommitRequest const& request) {
1754         EXPECT_EQ("test-session-name", request.session());
1755         EXPECT_EQ("test-txn-id", request.transaction_id());
1756         return Status(StatusCode::kUnavailable, "try-again");
1757       })
1758       .WillOnce(
1759           [commit_timestamp](grpc::ClientContext&,
1760                              spanner_proto::CommitRequest const& request) {
1761             EXPECT_EQ("test-session-name", request.session());
1762             EXPECT_EQ("test-txn-id", request.transaction_id());
1763             spanner_proto::CommitResponse response;
1764             *response.mutable_commit_timestamp() =
1765                 internal::TimestampToProto(commit_timestamp);
1766             return response;
1767           });
1768 
1769   // Set the id because that makes the commit idempotent.
1770   auto txn = MakeReadWriteTransaction();
1771   SetTransactionId(txn, "test-txn-id");
1772 
1773   auto commit = conn->Commit({txn});
1774   EXPECT_STATUS_OK(commit);
1775   EXPECT_EQ(commit_timestamp, commit->commit_timestamp);
1776 }
1777 
TEST(ConnectionImplTest,CommitSuccessWithTransactionId)1778 TEST(ConnectionImplTest, CommitSuccessWithTransactionId) {
1779   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1780 
1781   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1782   auto conn = MakeConnection(
1783       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1784   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1785       .WillOnce(
1786           [&db](grpc::ClientContext&,
1787                 spanner_proto::BatchCreateSessionsRequest const& request) {
1788             EXPECT_EQ(db.FullName(), request.database());
1789             return MakeSessionsResponse({"test-session-name"});
1790           });
1791   EXPECT_CALL(*mock, Commit(_, _))
1792       .WillOnce([](grpc::ClientContext&,
1793                    spanner_proto::CommitRequest const& request) {
1794         EXPECT_EQ("test-session-name", request.session());
1795         EXPECT_EQ("test-txn-id", request.transaction_id());
1796         spanner_proto::CommitResponse response;
1797         *response.mutable_commit_timestamp() = internal::TimestampToProto(
1798             MakeTimestamp(std::chrono::system_clock::from_time_t(123)).value());
1799         return response;
1800       });
1801 
1802   // Set the id because that makes the commit idempotent.
1803   auto txn = MakeReadWriteTransaction();
1804   SetTransactionId(txn, "test-txn-id");
1805 
1806   auto commit = conn->Commit({txn});
1807   EXPECT_STATUS_OK(commit);
1808 }
1809 
TEST(ConnectionImplTest,RollbackGetSessionFailure)1810 TEST(ConnectionImplTest, RollbackGetSessionFailure) {
1811   auto db = Database("project", "instance", "database");
1812 
1813   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1814   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1815       .WillOnce(
1816           [&db](grpc::ClientContext&,
1817                 spanner_proto::BatchCreateSessionsRequest const& request) {
1818             EXPECT_EQ(db.FullName(), request.database());
1819             return Status(StatusCode::kPermissionDenied, "uh-oh in GetSession");
1820           });
1821   EXPECT_CALL(*mock, Rollback(_, _)).Times(0);
1822 
1823   auto conn = MakeConnection(
1824       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1825   auto txn = MakeReadWriteTransaction();
1826   SetTransactionId(txn, "test-txn-id");
1827   auto rollback = conn->Rollback({txn});
1828   EXPECT_EQ(StatusCode::kPermissionDenied, rollback.code());
1829   EXPECT_THAT(rollback.message(), HasSubstr("uh-oh in GetSession"));
1830 }
1831 
TEST(ConnectionImplTest,RollbackBeginTransaction)1832 TEST(ConnectionImplTest, RollbackBeginTransaction) {
1833   auto db = Database("project", "instance", "database");
1834   std::string const session_name = "test-session-name";
1835 
1836   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1837   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1838       .WillOnce(
1839           [&db](grpc::ClientContext&,
1840                 spanner_proto::BatchCreateSessionsRequest const& request) {
1841             EXPECT_EQ(db.FullName(), request.database());
1842             return MakeSessionsResponse({"test-session-name"});
1843           });
1844   EXPECT_CALL(*mock, BeginTransaction(_, _))
1845       .WillOnce(Return(MakeTestTransaction("RollbackBeginTransaction")));
1846   EXPECT_CALL(*mock, Rollback(_, _))
1847       .WillOnce([](grpc::ClientContext&,
1848                    spanner_proto::RollbackRequest const& request) {
1849         EXPECT_EQ("test-session-name", request.session());
1850         EXPECT_EQ("RollbackBeginTransaction", request.transaction_id());
1851         return Status();
1852       });
1853 
1854   auto conn = MakeConnection(
1855       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1856   auto txn = MakeReadWriteTransaction();
1857   auto rollback = conn->Rollback({txn});
1858   EXPECT_STATUS_OK(rollback);
1859 }
1860 
TEST(ConnectionImplTest,RollbackSingleUseTransaction)1861 TEST(ConnectionImplTest, RollbackSingleUseTransaction) {
1862   auto db = Database("project", "instance", "database");
1863   std::string const session_name = "test-session-name";
1864 
1865   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1866   EXPECT_CALL(*mock, BatchCreateSessions(_, _)).Times(0);
1867   EXPECT_CALL(*mock, Rollback(_, _)).Times(0);
1868 
1869   auto conn = MakeConnection(
1870       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1871   auto txn = internal::MakeSingleUseTransaction(
1872       Transaction::SingleUseOptions{Transaction::ReadOnlyOptions{}});
1873   auto rollback = conn->Rollback({txn});
1874   EXPECT_EQ(StatusCode::kInvalidArgument, rollback.code());
1875   EXPECT_THAT(rollback.message(), HasSubstr("Cannot rollback"));
1876 }
1877 
TEST(ConnectionImplTest,RollbackPermanentFailure)1878 TEST(ConnectionImplTest, RollbackPermanentFailure) {
1879   auto db = Database("project", "instance", "database");
1880   std::string const session_name = "test-session-name";
1881   std::string const transaction_id = "test-txn-id";
1882 
1883   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1884   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1885       .WillOnce([&db, &session_name](
1886                     grpc::ClientContext&,
1887                     spanner_proto::BatchCreateSessionsRequest const& request) {
1888         EXPECT_EQ(db.FullName(), request.database());
1889         return MakeSessionsResponse({session_name});
1890       });
1891   EXPECT_CALL(*mock, Rollback(_, _))
1892       .WillOnce([&session_name, &transaction_id](
1893                     grpc::ClientContext&,
1894                     spanner_proto::RollbackRequest const& request) {
1895         EXPECT_EQ(session_name, request.session());
1896         EXPECT_EQ(transaction_id, request.transaction_id());
1897         return Status(StatusCode::kPermissionDenied, "uh-oh in Rollback");
1898       });
1899 
1900   auto conn = MakeLimitedRetryConnection(db, mock);
1901   auto txn = MakeReadWriteTransaction();
1902   SetTransactionId(txn, transaction_id);
1903   auto rollback = conn->Rollback({txn});
1904   EXPECT_EQ(StatusCode::kPermissionDenied, rollback.code());
1905   EXPECT_THAT(rollback.message(), HasSubstr("uh-oh in Rollback"));
1906 }
1907 
TEST(ConnectionImplTest,RollbackTooManyTransientFailures)1908 TEST(ConnectionImplTest, RollbackTooManyTransientFailures) {
1909   auto db = Database("project", "instance", "database");
1910   std::string const session_name = "test-session-name";
1911   std::string const transaction_id = "test-txn-id";
1912 
1913   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1914   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1915       .WillOnce([&db, &session_name](
1916                     grpc::ClientContext&,
1917                     spanner_proto::BatchCreateSessionsRequest const& request) {
1918         EXPECT_EQ(db.FullName(), request.database());
1919         return MakeSessionsResponse({session_name});
1920       });
1921   EXPECT_CALL(*mock, Rollback(_, _))
1922       .Times(AtLeast(2))
1923       .WillRepeatedly([&session_name, &transaction_id](
1924                           grpc::ClientContext&,
1925                           spanner_proto::RollbackRequest const& request) {
1926         EXPECT_EQ(session_name, request.session());
1927         EXPECT_EQ(transaction_id, request.transaction_id());
1928         return Status(StatusCode::kUnavailable, "try-again in Rollback");
1929       });
1930 
1931   auto conn = MakeLimitedRetryConnection(db, mock);
1932   auto txn = MakeReadWriteTransaction();
1933   SetTransactionId(txn, transaction_id);
1934   auto rollback = conn->Rollback({txn});
1935   EXPECT_EQ(StatusCode::kUnavailable, rollback.code());
1936   EXPECT_THAT(rollback.message(), HasSubstr("try-again in Rollback"));
1937 }
1938 
TEST(ConnectionImplTest,RollbackSuccess)1939 TEST(ConnectionImplTest, RollbackSuccess) {
1940   auto db = Database("project", "instance", "database");
1941   std::string const session_name = "test-session-name";
1942   std::string const transaction_id = "test-txn-id";
1943 
1944   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1945   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
1946       .WillOnce([&db, &session_name](
1947                     grpc::ClientContext&,
1948                     spanner_proto::BatchCreateSessionsRequest const& request) {
1949         EXPECT_EQ(db.FullName(), request.database());
1950         return MakeSessionsResponse({session_name});
1951       });
1952   EXPECT_CALL(*mock, Rollback(_, _))
1953       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
1954       .WillOnce([&session_name, &transaction_id](
1955                     grpc::ClientContext&,
1956                     spanner_proto::RollbackRequest const& request) {
1957         EXPECT_EQ(session_name, request.session());
1958         EXPECT_EQ(transaction_id, request.transaction_id());
1959         return Status();
1960       });
1961 
1962   auto conn = MakeConnection(
1963       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1964   auto txn = MakeReadWriteTransaction();
1965   SetTransactionId(txn, transaction_id);
1966   auto rollback = conn->Rollback({txn});
1967   EXPECT_STATUS_OK(rollback);
1968 }
1969 
TEST(ConnectionImplTest,RollbackInvalidatedTransaction)1970 TEST(ConnectionImplTest, RollbackInvalidatedTransaction) {
1971   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
1972 
1973   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1974   auto conn = MakeConnection(
1975       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
1976   EXPECT_CALL(*mock, BatchCreateSessions(_, _)).Times(0);
1977   EXPECT_CALL(*mock, Rollback(_, _)).Times(0);
1978 
1979   // Rolling back an invalidated transaction is a unilateral success.
1980   auto txn = MakeReadWriteTransaction();
1981   SetTransactionInvalid(txn,
1982                         Status(StatusCode::kAlreadyExists, "constraint error"));
1983 
1984   auto rollback_status = conn->Rollback({txn});
1985   EXPECT_EQ(StatusCode::kAlreadyExists, rollback_status.code());
1986   EXPECT_THAT(rollback_status.message(), HasSubstr("constraint error"));
1987 }
1988 
TEST(ConnectionImplTest,PartitionReadSuccess)1989 TEST(ConnectionImplTest, PartitionReadSuccess) {
1990   auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
1991   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
1992   auto conn =
1993       MakeConnection(db, {mock_spanner_stub},
1994                      ConnectionOptions{grpc::InsecureChannelCredentials()});
1995   EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
1996       .WillOnce(
1997           [&db](grpc::ClientContext&,
1998                 spanner_proto::BatchCreateSessionsRequest const& request) {
1999             EXPECT_EQ(db.FullName(), request.database());
2000             return MakeSessionsResponse({"test-session-name"});
2001           });
2002 
2003   auto constexpr kTextPartitionResponse = R"pb(
2004     partitions: { partition_token: "BADDECAF" }
2005     partitions: { partition_token: "DEADBEEF" }
2006     transaction: { id: "CAFEDEAD" }
2007   )pb";
2008   google::spanner::v1::PartitionResponse partition_response;
2009   ASSERT_TRUE(
2010       TextFormat::ParseFromString(kTextPartitionResponse, &partition_response));
2011 
2012   auto constexpr kTextPartitionRequest = R"pb(
2013     session: "test-session-name"
2014     transaction: {
2015       begin { read_only { strong: true return_read_timestamp: true } }
2016     }
2017     table: "table"
2018     columns: "UserId"
2019     columns: "UserName"
2020     key_set: { all: true }
2021     partition_options: {}
2022   )pb";
2023   google::spanner::v1::PartitionReadRequest partition_request;
2024   ASSERT_TRUE(
2025       TextFormat::ParseFromString(kTextPartitionRequest, &partition_request));
2026 
2027   EXPECT_CALL(*mock_spanner_stub,
2028               PartitionRead(_, IsProtoEqual(partition_request)))
2029       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
2030       .WillOnce(Return(partition_response));
2031 
2032   Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
2033   StatusOr<std::vector<ReadPartition>> result = conn->PartitionRead(
2034       {{txn, "table", KeySet::All(), {"UserId", "UserName"}}});
2035   ASSERT_STATUS_OK(result);
2036   EXPECT_THAT(txn, HasSessionAndTransactionId("test-session-name", "CAFEDEAD"));
2037 
2038   std::vector<ReadPartition> expected_read_partitions = {
2039       internal::MakeReadPartition("CAFEDEAD", "test-session-name", "BADDECAF",
2040                                   "table", KeySet::All(),
2041                                   {"UserId", "UserName"}),
2042       internal::MakeReadPartition("CAFEDEAD", "test-session-name", "DEADBEEF",
2043                                   "table", KeySet::All(),
2044                                   {"UserId", "UserName"})};
2045 
2046   EXPECT_THAT(*result, testing::UnorderedPointwise(testing::Eq(),
2047                                                    expected_read_partitions));
2048 }
2049 
TEST(ConnectionImplTest,PartitionReadPermanentFailure)2050 TEST(ConnectionImplTest, PartitionReadPermanentFailure) {
2051   auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2052   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2053   auto conn = MakeLimitedRetryConnection(db, mock_spanner_stub);
2054 
2055   {
2056     InSequence seq;
2057     EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2058         .WillOnce(
2059             [&db](grpc::ClientContext&,
2060                   spanner_proto::BatchCreateSessionsRequest const& request) {
2061               EXPECT_EQ(db.FullName(), request.database());
2062               return MakeSessionsResponse({"test-session-name"});
2063             });
2064 
2065     Status status(StatusCode::kPermissionDenied, "uh-oh");
2066     EXPECT_CALL(*mock_spanner_stub, PartitionRead(_, _))
2067         .WillOnce(Return(status));
2068     EXPECT_CALL(*mock_spanner_stub, BeginTransaction(_, _))
2069         .WillOnce(Return(MakeTestTransaction()));
2070     EXPECT_CALL(*mock_spanner_stub, PartitionRead(_, _))
2071         .WillOnce(Return(status));
2072   }
2073 
2074   StatusOr<std::vector<ReadPartition>> result = conn->PartitionRead(
2075       {{MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()),
2076         "table",
2077         KeySet::All(),
2078         {"UserId", "UserName"}}});
2079   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
2080   EXPECT_THAT(result.status().message(), HasSubstr("uh-oh"));
2081 }
2082 
TEST(ConnectionImplTest,PartitionReadTooManyTransientFailures)2083 TEST(ConnectionImplTest, PartitionReadTooManyTransientFailures) {
2084   auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2085   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2086   auto conn = MakeLimitedRetryConnection(db, mock_spanner_stub);
2087 
2088   {
2089     InSequence seq;
2090     EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2091         .WillOnce(
2092             [&db](grpc::ClientContext&,
2093                   spanner_proto::BatchCreateSessionsRequest const& request) {
2094               EXPECT_EQ(db.FullName(), request.database());
2095               return MakeSessionsResponse({"test-session-name"});
2096             });
2097 
2098     Status status(StatusCode::kUnavailable, "try-again");
2099     EXPECT_CALL(*mock_spanner_stub, PartitionRead(_, _))
2100         .Times(AtLeast(2))
2101         .WillRepeatedly(Return(status));
2102     EXPECT_CALL(*mock_spanner_stub, BeginTransaction(_, _))
2103         .WillOnce(Return(MakeTestTransaction()));
2104     EXPECT_CALL(*mock_spanner_stub, PartitionRead(_, _))
2105         .Times(AtLeast(2))
2106         .WillRepeatedly(Return(status));
2107   }
2108 
2109   StatusOr<std::vector<ReadPartition>> result = conn->PartitionRead(
2110       {{MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()),
2111         "table",
2112         KeySet::All(),
2113         {"UserId", "UserName"}}});
2114   EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
2115   EXPECT_THAT(result.status().message(), HasSubstr("try-again"));
2116 }
2117 
TEST(ConnectionImplTest,PartitionQuerySuccess)2118 TEST(ConnectionImplTest, PartitionQuerySuccess) {
2119   auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2120   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2121   auto conn =
2122       MakeConnection(db, {mock_spanner_stub},
2123                      ConnectionOptions{grpc::InsecureChannelCredentials()});
2124   EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2125       .WillOnce(
2126           [&db](grpc::ClientContext&,
2127                 spanner_proto::BatchCreateSessionsRequest const& request) {
2128             EXPECT_EQ(db.FullName(), request.database());
2129             return MakeSessionsResponse({"test-session-name"});
2130           });
2131 
2132   auto constexpr kTextPartitionResponse = R"pb(
2133     partitions: { partition_token: "BADDECAF" }
2134     partitions: { partition_token: "DEADBEEF" }
2135     transaction: { id: "CAFEDEAD" }
2136   )pb";
2137   google::spanner::v1::PartitionResponse partition_response;
2138   ASSERT_TRUE(
2139       TextFormat::ParseFromString(kTextPartitionResponse, &partition_response));
2140 
2141   auto constexpr kTextPartitionRequest = R"pb(
2142     session: "test-session-name"
2143     transaction: {
2144       begin { read_only { strong: true return_read_timestamp: true } }
2145     }
2146     sql: "select * from table"
2147     params: {}
2148     partition_options: {}
2149   )pb";
2150   google::spanner::v1::PartitionQueryRequest partition_request;
2151   ASSERT_TRUE(
2152       TextFormat::ParseFromString(kTextPartitionRequest, &partition_request));
2153   EXPECT_CALL(*mock_spanner_stub,
2154               PartitionQuery(_, IsProtoEqual(partition_request)))
2155       .WillOnce(Return(Status(StatusCode::kUnavailable, "try-again")))
2156       .WillOnce(Return(partition_response));
2157 
2158   SqlStatement sql_statement("select * from table");
2159   StatusOr<std::vector<QueryPartition>> result = conn->PartitionQuery(
2160       {MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()), sql_statement});
2161   ASSERT_STATUS_OK(result);
2162 
2163   std::vector<QueryPartition> expected_query_partitions = {
2164       internal::MakeQueryPartition("CAFEDEAD", "test-session-name", "BADDECAF",
2165                                    sql_statement),
2166       internal::MakeQueryPartition("CAFEDEAD", "test-session-name", "DEADBEEF",
2167                                    sql_statement)};
2168 
2169   EXPECT_THAT(*result, testing::UnorderedPointwise(testing::Eq(),
2170                                                    expected_query_partitions));
2171 }
2172 
TEST(ConnectionImplTest,PartitionQueryPermanentFailure)2173 TEST(ConnectionImplTest, PartitionQueryPermanentFailure) {
2174   auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2175   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2176   auto conn = MakeLimitedRetryConnection(db, mock_spanner_stub);
2177 
2178   Status failed_status = Status(StatusCode::kPermissionDenied, "End of line.");
2179   {
2180     InSequence seq;
2181     EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2182         .WillOnce(
2183             [&db](grpc::ClientContext&,
2184                   spanner_proto::BatchCreateSessionsRequest const& request) {
2185               EXPECT_EQ(db.FullName(), request.database());
2186               return MakeSessionsResponse({"test-session-name"});
2187             });
2188 
2189     EXPECT_CALL(*mock_spanner_stub, PartitionQuery(_, _))
2190         .WillOnce(Return(failed_status));
2191     EXPECT_CALL(*mock_spanner_stub, BeginTransaction(_, _))
2192         .WillOnce(Return(MakeTestTransaction()));
2193     EXPECT_CALL(*mock_spanner_stub, PartitionQuery(_, _))
2194         .WillOnce(Return(failed_status));
2195   }
2196 
2197   StatusOr<std::vector<QueryPartition>> result = conn->PartitionQuery(
2198       {MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()),
2199        SqlStatement("select * from table")});
2200   EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code());
2201   EXPECT_THAT(result.status().message(), HasSubstr(failed_status.message()));
2202 }
2203 
TEST(ConnectionImplTest,PartitionQueryTooManyTransientFailures)2204 TEST(ConnectionImplTest, PartitionQueryTooManyTransientFailures) {
2205   auto mock_spanner_stub = std::make_shared<spanner_testing::MockSpannerStub>();
2206   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2207   auto conn = MakeLimitedRetryConnection(db, mock_spanner_stub);
2208 
2209   Status failed_status =
2210       Status(StatusCode::kUnavailable, "try-again in PartitionQuery");
2211   {
2212     InSequence seq;
2213     EXPECT_CALL(*mock_spanner_stub, BatchCreateSessions(_, _))
2214         .WillOnce(
2215             [&db](grpc::ClientContext&,
2216                   spanner_proto::BatchCreateSessionsRequest const& request) {
2217               EXPECT_EQ(db.FullName(), request.database());
2218               return MakeSessionsResponse({"test-session-name"});
2219             });
2220 
2221     EXPECT_CALL(*mock_spanner_stub, PartitionQuery(_, _))
2222         .Times(AtLeast(2))
2223         .WillRepeatedly(Return(failed_status));
2224     EXPECT_CALL(*mock_spanner_stub, BeginTransaction(_, _))
2225         .WillOnce(Return(MakeTestTransaction()));
2226     EXPECT_CALL(*mock_spanner_stub, PartitionQuery(_, _))
2227         .Times(AtLeast(2))
2228         .WillRepeatedly(Return(failed_status));
2229   }
2230 
2231   StatusOr<std::vector<QueryPartition>> result = conn->PartitionQuery(
2232       {MakeReadOnlyTransaction(Transaction::ReadOnlyOptions()),
2233        SqlStatement("select * from table")});
2234   EXPECT_EQ(StatusCode::kUnavailable, result.status().code());
2235   EXPECT_THAT(result.status().message(), HasSubstr(failed_status.message()));
2236 }
2237 
TEST(ConnectionImplTest,MultipleThreads)2238 TEST(ConnectionImplTest, MultipleThreads) {
2239   auto db = Database("project", "instance", "database");
2240   std::string const session_prefix = "test-session-prefix-";
2241   std::string const transaction_id = "test-txn-id";
2242   std::atomic<int> session_counter(0);
2243 
2244   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2245   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2246       .WillRepeatedly(
2247           [&db, &session_prefix, &session_counter](
2248               grpc::ClientContext&,
2249               spanner_proto::BatchCreateSessionsRequest const& request) {
2250             EXPECT_EQ(db.FullName(), request.database());
2251             spanner_proto::BatchCreateSessionsResponse response;
2252             for (int i = 0; i < request.session_count(); ++i) {
2253               response.add_session()->set_name(
2254                   session_prefix + std::to_string(++session_counter));
2255             }
2256             return response;
2257           });
2258   EXPECT_CALL(*mock, Rollback(_, _))
2259       .WillRepeatedly(
2260           [session_prefix](grpc::ClientContext&,
2261                            spanner_proto::RollbackRequest const& request) {
2262             EXPECT_THAT(request.session(), StartsWith(session_prefix));
2263             return Status();
2264           });
2265 
2266   auto conn = MakeConnection(
2267       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
2268 
2269   int const per_thread_iterations = 1000;
2270   auto const thread_count = []() -> unsigned {
2271     if (std::thread::hardware_concurrency() == 0) {
2272       return 16;
2273     }
2274     return std::thread::hardware_concurrency();
2275   }();
2276 
2277   auto runner = [](int thread_id, int iterations, Connection* conn) {
2278     for (int i = 0; i != iterations; ++i) {
2279       auto txn = MakeReadWriteTransaction();
2280       SetTransactionId(
2281           txn, "txn-" + std::to_string(thread_id) + ":" + std::to_string(i));
2282       auto rollback = conn->Rollback({txn});
2283       EXPECT_STATUS_OK(rollback);
2284     }
2285   };
2286 
2287   std::vector<std::future<void>> tasks;
2288   for (unsigned i = 0; i != thread_count; ++i) {
2289     tasks.push_back(std::async(std::launch::async, runner, i,
2290                                per_thread_iterations, conn.get()));
2291   }
2292 
2293   for (auto& f : tasks) {
2294     f.get();
2295   }
2296 }
2297 
2298 /**
2299  * @test Verify Transactions remain bound to a single Session.
2300  *
2301  * This test makes interleaved Read() calls using two separate Transactions,
2302  * and ensures each Transaction uses the same session consistently.
2303  */
TEST(ConnectionImplTest,TransactionSessionBinding)2304 TEST(ConnectionImplTest, TransactionSessionBinding) {
2305   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2306 
2307   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2308   auto conn = MakeConnection(
2309       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
2310   EXPECT_CALL(*mock,
2311               BatchCreateSessions(_, BatchCreateSessionsRequestHasDatabase(db)))
2312       .WillOnce(Return(MakeSessionsResponse({"session-1"})))
2313       .WillOnce(Return(MakeSessionsResponse({"session-2"})));
2314 
2315   constexpr int kNumResponses = 4;
2316   std::array<spanner_proto::PartialResultSet, kNumResponses> responses;
2317   std::array<std::unique_ptr<MockGrpcReader>, kNumResponses> readers;
2318   for (int i = 0; i < kNumResponses; ++i) {
2319     auto constexpr kText = R"pb(
2320       metadata: {
2321         row_type: {
2322           fields: {
2323             name: "Number",
2324             type: { code: INT64 }
2325           }
2326         }
2327       }
2328     )pb";
2329     ASSERT_TRUE(TextFormat::ParseFromString(kText, &responses[i]));
2330     // The first two responses are reads from two different "begin"
2331     // transactions.
2332     switch (i) {
2333       case 0:
2334         *responses[i].mutable_metadata()->mutable_transaction() =
2335             MakeTestTransaction("ABCDEF01");
2336         break;
2337       case 1:
2338         *responses[i].mutable_metadata()->mutable_transaction() =
2339             MakeTestTransaction("ABCDEF02");
2340         break;
2341     }
2342     responses[i].add_values()->set_string_value(std::to_string(i));
2343 
2344     readers[i] = absl::make_unique<MockGrpcReader>();
2345     EXPECT_CALL(*readers[i], Read(_))
2346         .WillOnce(DoAll(SetArgPointee<0>(responses[i]), Return(true)))
2347         .WillOnce(Return(false));
2348     EXPECT_CALL(*readers[i], Finish()).WillOnce(Return(grpc::Status()));
2349   }
2350 
2351   // Ensure the StreamingRead calls have the expected session and transaction
2352   // IDs or "begin" set as appropriate.
2353   {
2354     InSequence s;
2355     EXPECT_CALL(
2356         *mock,
2357         StreamingRead(_, ReadRequestHasSessionAndBeginTransaction("session-1")))
2358         .WillOnce(Return(ByMove(std::move(readers[0]))));
2359     EXPECT_CALL(
2360         *mock,
2361         StreamingRead(_, ReadRequestHasSessionAndBeginTransaction("session-2")))
2362         .WillOnce(Return(ByMove(std::move(readers[1]))));
2363     EXPECT_CALL(*mock, StreamingRead(_, ReadRequestHasSessionAndTransactionId(
2364                                             "session-1", "ABCDEF01")))
2365         .WillOnce(Return(ByMove(std::move(readers[2]))));
2366     EXPECT_CALL(*mock, StreamingRead(_, ReadRequestHasSessionAndTransactionId(
2367                                             "session-2", "ABCDEF02")))
2368         .WillOnce(Return(ByMove(std::move(readers[3]))));
2369   }
2370 
2371   // Now do the actual reads and verify the results.
2372   Transaction txn1 = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
2373   auto rows = conn->Read({txn1, "table", KeySet::All(), {"Number"}});
2374   EXPECT_THAT(txn1, HasSessionAndTransactionId("session-1", "ABCDEF01"));
2375   for (auto& row : StreamOf<std::tuple<std::int64_t>>(rows)) {
2376     EXPECT_STATUS_OK(row);
2377     EXPECT_EQ(std::get<0>(*row), 0);
2378   }
2379 
2380   Transaction txn2 = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
2381   rows = conn->Read({txn2, "table", KeySet::All(), {"Number"}});
2382   EXPECT_THAT(txn2, HasSessionAndTransactionId("session-2", "ABCDEF02"));
2383   for (auto& row : StreamOf<std::tuple<std::int64_t>>(rows)) {
2384     EXPECT_STATUS_OK(row);
2385     EXPECT_EQ(std::get<0>(*row), 1);
2386   }
2387 
2388   rows = conn->Read({txn1, "table", KeySet::All(), {"Number"}});
2389   EXPECT_THAT(txn1, HasSessionAndTransactionId("session-1", "ABCDEF01"));
2390   for (auto& row : StreamOf<std::tuple<std::int64_t>>(rows)) {
2391     EXPECT_STATUS_OK(row);
2392     EXPECT_EQ(std::get<0>(*row), 2);
2393   }
2394 
2395   rows = conn->Read({txn2, "table", KeySet::All(), {"Number"}});
2396   EXPECT_THAT(txn2, HasSessionAndTransactionId("session-2", "ABCDEF02"));
2397   for (auto& row : StreamOf<std::tuple<std::int64_t>>(rows)) {
2398     EXPECT_STATUS_OK(row);
2399     EXPECT_EQ(std::get<0>(*row), 3);
2400   }
2401 }
2402 
2403 /**
2404  * @test Verify if a `Transaction` outlives the `ConnectionImpl` it was used
2405  * with, it does not call back into the deleted `ConnectionImpl` to release
2406  * the associated `Session` (which would be detected in asan/msan builds.)
2407  */
TEST(ConnectionImplTest,TransactionOutlivesConnection)2408 TEST(ConnectionImplTest, TransactionOutlivesConnection) {
2409   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2410 
2411   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2412   auto conn = MakeConnection(
2413       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
2414   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2415       .WillOnce(
2416           [&db](grpc::ClientContext&,
2417                 spanner_proto::BatchCreateSessionsRequest const& request) {
2418             EXPECT_EQ(db.FullName(), request.database());
2419             return MakeSessionsResponse({"test-session-name"});
2420           });
2421 
2422   auto grpc_reader = absl::make_unique<MockGrpcReader>();
2423   auto constexpr kText = R"pb(metadata: { transaction: { id: "ABCDEF00" } })pb";
2424   spanner_proto::PartialResultSet response;
2425   ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
2426   EXPECT_CALL(*grpc_reader, Read(_))
2427       .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)));
2428   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(grpc::Status()));
2429   EXPECT_CALL(*mock, StreamingRead(_, _))
2430       .WillOnce(Return(ByMove(std::move(grpc_reader))));
2431 
2432   Transaction txn = MakeReadOnlyTransaction(Transaction::ReadOnlyOptions());
2433   auto rows = conn->Read({txn, "table", KeySet::All(), {"UserId", "UserName"}});
2434   EXPECT_THAT(txn, HasSessionAndTransactionId("test-session-name", "ABCDEF00"));
2435 
2436   // `conn` is the only reference to the `ConnectionImpl`, so dropping it will
2437   // cause the `ConnectionImpl` object to be deleted, while `txn` and its
2438   // associated `Session` continues to live on.
2439   conn.reset();
2440 }
2441 
TEST(ConnectionImplTest,ReadSessionNotFound)2442 TEST(ConnectionImplTest, ReadSessionNotFound) {
2443   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2444   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2445       .WillOnce([](grpc::ClientContext&,
2446                    spanner_proto::BatchCreateSessionsRequest const&) {
2447         return MakeSessionsResponse({"test-session-name"});
2448       });
2449   auto grpc_reader = absl::make_unique<MockGrpcReader>();
2450   EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
2451   grpc::Status finish_status(grpc::StatusCode::NOT_FOUND, "Session not found");
2452   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
2453   EXPECT_CALL(*mock, StreamingRead(_, _))
2454       .WillOnce(Return(ByMove(std::move(grpc_reader))));
2455 
2456   auto db = Database("project", "instance", "database");
2457   auto conn = MakeLimitedRetryConnection(db, mock);
2458   auto txn = MakeReadWriteTransaction();
2459   SetTransactionId(txn, "test-txn-id");
2460   auto params = Connection::ReadParams{txn};
2461   auto response = GetSingularRow(conn->Read(std::move(params)));
2462   EXPECT_FALSE(response.ok());
2463   auto status = response.status();
2464   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2465   EXPECT_THAT(txn, HasBadSession());
2466 }
2467 
TEST(ConnectionImplTest,PartitionReadSessionNotFound)2468 TEST(ConnectionImplTest, PartitionReadSessionNotFound) {
2469   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2470   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2471       .WillOnce([](grpc::ClientContext&,
2472                    spanner_proto::BatchCreateSessionsRequest const&) {
2473         return MakeSessionsResponse({"test-session-name"});
2474       });
2475   EXPECT_CALL(*mock, PartitionRead(_, _))
2476       .WillOnce(
2477           [](grpc::ClientContext&, spanner_proto::PartitionReadRequest const&) {
2478             return Status(StatusCode::kNotFound, "Session not found");
2479           });
2480 
2481   auto db = Database("project", "instance", "database");
2482   auto conn = MakeLimitedRetryConnection(db, mock);
2483   auto txn = MakeReadWriteTransaction();
2484   SetTransactionId(txn, "test-txn-id");
2485   auto params = Connection::ReadParams{txn};
2486   auto response = conn->PartitionRead({params});
2487   EXPECT_FALSE(response.ok());
2488   auto status = response.status();
2489   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2490   EXPECT_THAT(txn, HasBadSession());
2491 }
2492 
TEST(ConnectionImplTest,ExecuteQuerySessionNotFound)2493 TEST(ConnectionImplTest, ExecuteQuerySessionNotFound) {
2494   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2495   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2496       .WillOnce([](grpc::ClientContext&,
2497                    spanner_proto::BatchCreateSessionsRequest const&) {
2498         return MakeSessionsResponse({"test-session-name"});
2499       });
2500   auto grpc_reader = absl::make_unique<MockGrpcReader>();
2501   EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
2502   grpc::Status finish_status(grpc::StatusCode::NOT_FOUND, "Session not found");
2503   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
2504   EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
2505       .WillOnce(Return(ByMove(std::move(grpc_reader))));
2506 
2507   auto db = Database("project", "instance", "database");
2508   auto conn = MakeLimitedRetryConnection(db, mock);
2509   auto txn = MakeReadWriteTransaction();
2510   SetTransactionId(txn, "test-txn-id");
2511   auto response = GetSingularRow(conn->ExecuteQuery({txn}));
2512   EXPECT_FALSE(response.ok());
2513   auto status = response.status();
2514   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2515   EXPECT_THAT(txn, HasBadSession());
2516 }
2517 
TEST(ConnectionImplTest,ProfileQuerySessionNotFound)2518 TEST(ConnectionImplTest, ProfileQuerySessionNotFound) {
2519   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2520   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2521       .WillOnce([](grpc::ClientContext&,
2522                    spanner_proto::BatchCreateSessionsRequest const&) {
2523         return MakeSessionsResponse({"test-session-name"});
2524       });
2525   auto grpc_reader = absl::make_unique<MockGrpcReader>();
2526   EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(Return(false));
2527   grpc::Status finish_status(grpc::StatusCode::NOT_FOUND, "Session not found");
2528   EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
2529   EXPECT_CALL(*mock, ExecuteStreamingSql(_, _))
2530       .WillOnce(Return(ByMove(std::move(grpc_reader))));
2531 
2532   auto db = Database("project", "instance", "database");
2533   auto conn = MakeLimitedRetryConnection(db, mock);
2534   auto txn = MakeReadWriteTransaction();
2535   SetTransactionId(txn, "test-txn-id");
2536   auto response = GetSingularRow(conn->ProfileQuery({txn}));
2537   EXPECT_FALSE(response.ok());
2538   auto status = response.status();
2539   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2540   EXPECT_THAT(txn, HasBadSession());
2541 }
2542 
TEST(ConnectionImplTest,ExecuteDmlSessionNotFound)2543 TEST(ConnectionImplTest, ExecuteDmlSessionNotFound) {
2544   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2545   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2546       .WillOnce([](grpc::ClientContext&,
2547                    spanner_proto::BatchCreateSessionsRequest const&) {
2548         return MakeSessionsResponse({"test-session-name"});
2549       });
2550   EXPECT_CALL(*mock, ExecuteSql(_, _))
2551       .WillOnce(
2552           [](grpc::ClientContext&, spanner_proto::ExecuteSqlRequest const&) {
2553             return Status(StatusCode::kNotFound, "Session not found");
2554           });
2555 
2556   auto db = Database("project", "instance", "database");
2557   auto conn = MakeLimitedRetryConnection(db, mock);
2558   auto txn = MakeReadWriteTransaction();
2559   SetTransactionId(txn, "test-txn-id");
2560   auto response = conn->ExecuteDml({txn});
2561   EXPECT_FALSE(response.ok());
2562   auto status = response.status();
2563   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2564   EXPECT_THAT(txn, HasBadSession());
2565 }
2566 
TEST(ConnectionImplTest,ProfileDmlSessionNotFound)2567 TEST(ConnectionImplTest, ProfileDmlSessionNotFound) {
2568   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2569   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2570       .WillOnce([](grpc::ClientContext&,
2571                    spanner_proto::BatchCreateSessionsRequest const&) {
2572         return MakeSessionsResponse({"test-session-name"});
2573       });
2574   EXPECT_CALL(*mock, ExecuteSql(_, _))
2575       .WillOnce(
2576           [](grpc::ClientContext&, spanner_proto::ExecuteSqlRequest const&) {
2577             return Status(StatusCode::kNotFound, "Session not found");
2578           });
2579 
2580   auto db = Database("project", "instance", "database");
2581   auto conn = MakeLimitedRetryConnection(db, mock);
2582   auto txn = MakeReadWriteTransaction();
2583   SetTransactionId(txn, "test-txn-id");
2584   auto response = conn->ProfileDml({txn});
2585   EXPECT_FALSE(response.ok());
2586   auto status = response.status();
2587   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2588   EXPECT_THAT(txn, HasBadSession());
2589 }
2590 
TEST(ConnectionImplTest,AnalyzeSqlSessionNotFound)2591 TEST(ConnectionImplTest, AnalyzeSqlSessionNotFound) {
2592   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2593   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2594       .WillOnce([](grpc::ClientContext&,
2595                    spanner_proto::BatchCreateSessionsRequest const&) {
2596         return MakeSessionsResponse({"test-session-name"});
2597       });
2598   EXPECT_CALL(*mock, ExecuteSql(_, _))
2599       .WillOnce(
2600           [](grpc::ClientContext&, spanner_proto::ExecuteSqlRequest const&) {
2601             return Status(StatusCode::kNotFound, "Session not found");
2602           });
2603 
2604   auto db = Database("project", "instance", "database");
2605   auto conn = MakeLimitedRetryConnection(db, mock);
2606   auto txn = MakeReadWriteTransaction();
2607   SetTransactionId(txn, "test-txn-id");
2608   auto response = conn->AnalyzeSql({txn});
2609   EXPECT_FALSE(response.ok());
2610   auto status = response.status();
2611   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2612   EXPECT_THAT(txn, HasBadSession());
2613 }
2614 
TEST(ConnectionImplTest,PartitionQuerySessionNotFound)2615 TEST(ConnectionImplTest, PartitionQuerySessionNotFound) {
2616   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2617   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2618       .WillOnce([](grpc::ClientContext&,
2619                    spanner_proto::BatchCreateSessionsRequest const&) {
2620         return MakeSessionsResponse({"test-session-name"});
2621       });
2622   EXPECT_CALL(*mock, PartitionQuery(_, _))
2623       .WillOnce([](grpc::ClientContext&,
2624                    spanner_proto::PartitionQueryRequest const&) {
2625         return Status(StatusCode::kNotFound, "Session not found");
2626       });
2627 
2628   auto db = Database("project", "instance", "database");
2629   auto conn = MakeLimitedRetryConnection(db, mock);
2630   auto txn = MakeReadWriteTransaction();
2631   SetTransactionId(txn, "test-txn-id");
2632   auto response = conn->PartitionQuery({txn});
2633   EXPECT_FALSE(response.ok());
2634   auto status = response.status();
2635   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2636   EXPECT_THAT(txn, HasBadSession());
2637 }
2638 
TEST(ConnectionImplTest,ExecuteBatchDmlSessionNotFound)2639 TEST(ConnectionImplTest, ExecuteBatchDmlSessionNotFound) {
2640   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2641   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2642       .WillOnce([](grpc::ClientContext&,
2643                    spanner_proto::BatchCreateSessionsRequest const&) {
2644         return MakeSessionsResponse({"test-session-name"});
2645       });
2646   EXPECT_CALL(*mock, ExecuteBatchDml(_, _))
2647       .WillOnce([](grpc::ClientContext&,
2648                    spanner_proto::ExecuteBatchDmlRequest const&) {
2649         return Status(StatusCode::kNotFound, "Session not found");
2650       });
2651 
2652   auto db = Database("project", "instance", "database");
2653   auto conn = MakeLimitedRetryConnection(db, mock);
2654   auto txn = MakeReadWriteTransaction();
2655   SetTransactionId(txn, "test-txn-id");
2656   auto response = conn->ExecuteBatchDml({txn});
2657   EXPECT_FALSE(response.ok());
2658   auto status = response.status();
2659   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2660   EXPECT_THAT(txn, HasBadSession());
2661 }
2662 
TEST(ConnectionImplTest,ExecutePartitionedDmlSessionNotFound)2663 TEST(ConnectionImplTest, ExecutePartitionedDmlSessionNotFound) {
2664   // NOTE: There's no test here becuase this method does not accept a
2665   // Transaction and so there's no way to extract the Session to check if it's
2666   // bad. We could modify the API to inject/extract this, but this is a
2667   // user-facing API that we don't want to mess up.
2668 }
2669 
TEST(ConnectionImplTest,CommitSessionNotFound)2670 TEST(ConnectionImplTest, CommitSessionNotFound) {
2671   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2672   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2673       .WillOnce([](grpc::ClientContext&,
2674                    spanner_proto::BatchCreateSessionsRequest const&) {
2675         return MakeSessionsResponse({"test-session-name"});
2676       });
2677   EXPECT_CALL(*mock, Commit(_, _))
2678       .WillOnce([](grpc::ClientContext&, spanner_proto::CommitRequest const&) {
2679         return Status(StatusCode::kNotFound, "Session not found");
2680       });
2681 
2682   auto db = Database("project", "instance", "database");
2683   auto conn = MakeLimitedRetryConnection(db, mock);
2684   auto txn = MakeReadWriteTransaction();
2685   SetTransactionId(txn, "test-txn-id");
2686   auto response = conn->Commit({txn});
2687   EXPECT_FALSE(response.ok());
2688   auto status = response.status();
2689   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2690   EXPECT_THAT(txn, HasBadSession());
2691 }
2692 
TEST(ConnectionImplTest,RollbackSessionNotFound)2693 TEST(ConnectionImplTest, RollbackSessionNotFound) {
2694   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2695   EXPECT_CALL(*mock, BatchCreateSessions(_, _))
2696       .WillOnce([](grpc::ClientContext&,
2697                    spanner_proto::BatchCreateSessionsRequest const&) {
2698         return MakeSessionsResponse({"test-session-name"});
2699       });
2700   EXPECT_CALL(*mock, Rollback(_, _))
2701       .WillOnce(
2702           [](grpc::ClientContext&, spanner_proto::RollbackRequest const&) {
2703             return Status(StatusCode::kNotFound, "Session not found");
2704           });
2705 
2706   auto db = Database("project", "instance", "database");
2707   auto conn = MakeLimitedRetryConnection(db, mock);
2708   auto txn = MakeReadWriteTransaction();
2709   SetTransactionId(txn, "test-txn-id");
2710   auto status = conn->Rollback({txn});
2711   EXPECT_TRUE(IsSessionNotFound(status)) << status;
2712   EXPECT_THAT(txn, HasBadSession());
2713 }
2714 
TEST(ConnectionImplTest,OperationsFailOnInvalidatedTransaction)2715 TEST(ConnectionImplTest, OperationsFailOnInvalidatedTransaction) {
2716   auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
2717 
2718   auto db = Database("dummy_project", "dummy_instance", "dummy_database_id");
2719   auto conn = MakeConnection(
2720       db, {mock}, ConnectionOptions{grpc::InsecureChannelCredentials()});
2721 
2722   // Committing an invalidated transaction is a unilateral error.
2723   auto txn = MakeReadWriteTransaction();
2724   SetTransactionInvalid(
2725       txn, Status(StatusCode::kInvalidArgument, "BeginTransaction failed"));
2726   // All operations on an invalid transaction should return the error that
2727   // invalidated it, without actually making a RPC.
2728 
2729   EXPECT_THAT(
2730       conn->Read({txn, "table", KeySet::All(), {"Column"}}).begin()->status(),
2731       StatusIs(StatusCode::kInvalidArgument,
2732                HasSubstr("BeginTransaction failed")));
2733 
2734   EXPECT_THAT(conn->PartitionRead({{txn, "table", KeySet::All(), {"Column"}}}),
2735               StatusIs(StatusCode::kInvalidArgument,
2736                        HasSubstr("BeginTransaction failed")));
2737 
2738   EXPECT_THAT(
2739       conn->ExecuteQuery({txn, SqlStatement("select 1")}).begin()->status(),
2740       StatusIs(StatusCode::kInvalidArgument,
2741                HasSubstr("BeginTransaction failed")));
2742 
2743   EXPECT_THAT(conn->ExecuteDml({txn, SqlStatement("delete * from table")}),
2744               StatusIs(StatusCode::kInvalidArgument,
2745                        HasSubstr("BeginTransaction failed")));
2746 
2747   EXPECT_THAT(
2748       conn->ProfileQuery({txn, SqlStatement("select 1")}).begin()->status(),
2749       StatusIs(StatusCode::kInvalidArgument,
2750                HasSubstr("BeginTransaction failed")));
2751 
2752   EXPECT_THAT(conn->ProfileDml({txn, SqlStatement("delete * from table")}),
2753               StatusIs(StatusCode::kInvalidArgument,
2754                        HasSubstr("BeginTransaction failed")));
2755 
2756   EXPECT_THAT(conn->AnalyzeSql({txn, SqlStatement("select * from table")}),
2757               StatusIs(StatusCode::kInvalidArgument,
2758                        HasSubstr("BeginTransaction failed")));
2759 
2760   // ExecutePartitionedDml creates its own transaction so it's not tested here.
2761 
2762   EXPECT_THAT(conn->PartitionQuery({txn, SqlStatement("select * from table")}),
2763               StatusIs(StatusCode::kInvalidArgument,
2764                        HasSubstr("BeginTransaction failed")));
2765 
2766   EXPECT_THAT(conn->ExecuteBatchDml({txn}),
2767               StatusIs(StatusCode::kInvalidArgument,
2768                        HasSubstr("BeginTransaction failed")));
2769 
2770   EXPECT_THAT(conn->Commit({txn}),
2771               StatusIs(StatusCode::kInvalidArgument,
2772                        HasSubstr("BeginTransaction failed")));
2773 
2774   EXPECT_THAT(conn->Rollback({txn}),
2775               StatusIs(StatusCode::kInvalidArgument,
2776                        HasSubstr("BeginTransaction failed")));
2777 }
2778 
2779 #if defined(__GNUC__) || defined(__clang__)
2780 #pragma GCC diagnostic pop
2781 #endif
2782 
2783 }  // namespace
2784 }  // namespace internal
2785 }  // namespace SPANNER_CLIENT_NS
2786 }  // namespace spanner
2787 }  // namespace cloud
2788 }  // namespace google
2789