1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/spanner/client.h"
16 #include "google/cloud/spanner/database.h"
17 #include "google/cloud/spanner/database_admin_client.h"
18 #include "google/cloud/spanner/mutations.h"
19 #include "google/cloud/spanner/testing/database_integration_test.h"
20 #include "google/cloud/internal/getenv.h"
21 #include "google/cloud/internal/random.h"
22 #include "google/cloud/testing_util/assert_ok.h"
23 #include "absl/memory/memory.h"
24 #include <gmock/gmock.h>
25 
26 namespace google {
27 namespace cloud {
28 namespace spanner {
29 inline namespace SPANNER_CLIENT_NS {
30 namespace {
31 
32 using ::testing::UnorderedElementsAre;
33 using ::testing::UnorderedElementsAreArray;
34 
35 class ClientIntegrationTest : public spanner_testing::DatabaseIntegrationTest {
36  public:
ClientIntegrationTest()37   ClientIntegrationTest()
38       : emulator_(google::cloud::internal::GetEnv("SPANNER_EMULATOR_HOST")
39                       .has_value()) {}
40 
SetUpTestSuite()41   static void SetUpTestSuite() {
42     spanner_testing::DatabaseIntegrationTest::SetUpTestSuite();
43     client_ = absl::make_unique<Client>(MakeConnection(GetDatabase()));
44   }
45 
SetUp()46   void SetUp() override {
47     auto commit_result = client_->Commit(
48         Mutations{MakeDeleteMutation("Singers", KeySet::All())});
49     EXPECT_STATUS_OK(commit_result);
50   }
51 
InsertTwoSingers()52   static void InsertTwoSingers() {
53     auto commit_result = client_->Commit(Mutations{
54         InsertMutationBuilder("Singers", {"SingerId", "FirstName", "LastName"})
55             .EmplaceRow(1, "test-fname-1", "test-lname-1")
56             .EmplaceRow(2, "test-fname-2", "test-lname-2")
57             .Build()});
58     ASSERT_STATUS_OK(commit_result);
59   }
60 
TearDownTestSuite()61   static void TearDownTestSuite() {
62     client_ = nullptr;
63     spanner_testing::DatabaseIntegrationTest::TearDownTestSuite();
64   }
65 
66  protected:
EmulatorUnimplemented(Status const & status)67   bool EmulatorUnimplemented(Status const& status) {
68     return emulator_ && status.code() == StatusCode::kUnimplemented;
69   }
70 
71   bool emulator_;
72   static std::unique_ptr<Client> client_;
73 };
74 
75 std::unique_ptr<Client> ClientIntegrationTest::client_;
76 
77 /// @test Verify the basic insert operations for transaction commits.
TEST_F(ClientIntegrationTest,InsertAndCommit)78 TEST_F(ClientIntegrationTest, InsertAndCommit) {
79   ASSERT_NO_FATAL_FAILURE(InsertTwoSingers());
80 
81   auto rows = client_->Read("Singers", KeySet::All(),
82                             {"SingerId", "FirstName", "LastName"});
83   using RowType = std::tuple<std::int64_t, std::string, std::string>;
84   std::vector<RowType> returned_rows;
85   int row_number = 0;
86   for (auto& row : StreamOf<RowType>(rows)) {
87     EXPECT_STATUS_OK(row);
88     if (!row) break;
89     SCOPED_TRACE("Parsing row[" + std::to_string(row_number++) + "]");
90     returned_rows.push_back(*std::move(row));
91   }
92   EXPECT_THAT(returned_rows,
93               UnorderedElementsAre(RowType(1, "test-fname-1", "test-lname-1"),
94                                    RowType(2, "test-fname-2", "test-lname-2")));
95 }
96 
97 /// @test Verify the basic delete mutations work.
TEST_F(ClientIntegrationTest,DeleteAndCommit)98 TEST_F(ClientIntegrationTest, DeleteAndCommit) {
99   ASSERT_NO_FATAL_FAILURE(InsertTwoSingers());
100 
101   auto commit_result = client_->Commit(
102       Mutations{MakeDeleteMutation("Singers", KeySet().AddKey(MakeKey(1)))});
103   EXPECT_STATUS_OK(commit_result);
104 
105   auto rows = client_->Read("Singers", KeySet::All(),
106                             {"SingerId", "FirstName", "LastName"});
107 
108   using RowType = std::tuple<std::int64_t, std::string, std::string>;
109   std::vector<RowType> returned_rows;
110   int row_number = 0;
111   for (auto& row : StreamOf<RowType>(rows)) {
112     EXPECT_STATUS_OK(row);
113     if (!row) break;
114     SCOPED_TRACE("Parsing row[" + std::to_string(row_number++) + "]");
115     returned_rows.push_back(*std::move(row));
116   }
117   EXPECT_THAT(returned_rows,
118               UnorderedElementsAre(RowType(2, "test-fname-2", "test-lname-2")));
119 }
120 
121 /// @test Verify that read-write transactions with multiple statements work.
TEST_F(ClientIntegrationTest,MultipleInserts)122 TEST_F(ClientIntegrationTest, MultipleInserts) {
123   ASSERT_NO_FATAL_FAILURE(InsertTwoSingers());
124 
125   auto& client = *client_;
126   auto commit_result =
127       client_->Commit([&client](Transaction const& txn) -> StatusOr<Mutations> {
128         auto insert1 = client.ExecuteDml(
129             txn,
130             SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
131                          "VALUES (@id, @fname, @lname)",
132                          {{"id", Value(3)},
133                           {"fname", Value("test-fname-3")},
134                           {"lname", Value("test-lname-3")}}));
135         if (!insert1) return std::move(insert1).status();
136         auto insert2 = client.ExecuteDml(
137             txn,
138             SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
139                          "VALUES (@id, @fname, @lname)",
140                          {{"id", Value(4)},
141                           {"fname", Value("test-fname-4")},
142                           {"lname", Value("test-lname-4")}}));
143         if (!insert2) return std::move(insert2).status();
144         return Mutations{};
145       });
146   EXPECT_STATUS_OK(commit_result);
147 
148   auto rows = client_->Read("Singers", KeySet::All(),
149                             {"SingerId", "FirstName", "LastName"});
150 
151   using RowType = std::tuple<std::int64_t, std::string, std::string>;
152   std::vector<RowType> returned_rows;
153   int row_number = 0;
154   for (auto& row : StreamOf<RowType>(rows)) {
155     EXPECT_STATUS_OK(row);
156     if (!row) break;
157     SCOPED_TRACE("Parsing row[" + std::to_string(row_number++) + "]");
158     returned_rows.push_back(*std::move(row));
159   }
160   EXPECT_THAT(returned_rows,
161               UnorderedElementsAre(RowType(1, "test-fname-1", "test-lname-1"),
162                                    RowType(2, "test-fname-2", "test-lname-2"),
163                                    RowType(3, "test-fname-3", "test-lname-3"),
164                                    RowType(4, "test-fname-4", "test-lname-4")));
165 }
166 
167 /// @test Verify that Client::Rollback works as expected.
TEST_F(ClientIntegrationTest,TransactionRollback)168 TEST_F(ClientIntegrationTest, TransactionRollback) {
169   ASSERT_NO_FATAL_FAILURE(InsertTwoSingers());
170 
171   using RowType = std::tuple<std::int64_t, std::string, std::string>;
172 
173   // Cannot use Commit in this test because we want to call Rollback
174   // explicitly. This means we need to retry ABORTED calls ourselves.
175   Transaction txn = MakeReadWriteTransaction();
176   for (auto start = std::chrono::steady_clock::now(),
177             deadline = start + std::chrono::minutes(1);
178        start < deadline; start = std::chrono::steady_clock::now()) {
179     auto is_retryable_failure = [](StatusOr<DmlResult> const& s) {
180       return !s && s.status().code() == StatusCode::kAborted;
181     };
182 
183     // Share lock priority with the previous loop so that we have a slightly
184     // better chance of avoiding StatusCode::kAborted from ExecuteDml().
185     txn = MakeReadWriteTransaction(txn);
186 
187     auto insert1 = client_->ExecuteDml(
188         txn, SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
189                           "VALUES (@id, @fname, @lname)",
190                           {{"id", Value(3)},
191                            {"fname", Value("test-fname-3")},
192                            {"lname", Value("test-lname-3")}}));
193     if (is_retryable_failure(insert1)) continue;
194     ASSERT_STATUS_OK(insert1);
195 
196     auto insert2 = client_->ExecuteDml(
197         txn, SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
198                           "VALUES (@id, @fname, @lname)",
199                           {{"id", Value(4)},
200                            {"fname", Value("test-fname-4")},
201                            {"lname", Value("test-lname-4")}}));
202     if (is_retryable_failure(insert2)) continue;
203     ASSERT_STATUS_OK(insert2);
204 
205     auto rows = client_->Read(txn, "Singers", KeySet::All(),
206                               {"SingerId", "FirstName", "LastName"});
207 
208     std::vector<RowType> returned_rows;
209     int row_number = 0;
210     Status iteration_status;
211     for (auto& row : StreamOf<RowType>(rows)) {
212       if (!row) {
213         iteration_status = std::move(row).status();
214         break;
215       }
216       SCOPED_TRACE("Parsing row[" + std::to_string(row_number++) + "]");
217       returned_rows.push_back(*std::move(row));
218     }
219     if (iteration_status.code() == StatusCode::kAborted) continue;
220     ASSERT_THAT(returned_rows, UnorderedElementsAre(
221                                    RowType(1, "test-fname-1", "test-lname-1"),
222                                    RowType(2, "test-fname-2", "test-lname-2"),
223                                    RowType(3, "test-fname-3", "test-lname-3"),
224                                    RowType(4, "test-fname-4", "test-lname-4")));
225 
226     auto insert_rollback_result = client_->Rollback(txn);
227     ASSERT_STATUS_OK(insert_rollback_result);
228     break;
229   }
230 
231   std::vector<RowType> returned_rows;
232   auto rows = client_->Read("Singers", KeySet::All(),
233                             {"SingerId", "FirstName", "LastName"});
234   int row_number = 0;
235   for (auto& row : StreamOf<RowType>(rows)) {
236     EXPECT_STATUS_OK(row);
237     if (!row) break;
238     SCOPED_TRACE("Parsing row[" + std::to_string(row_number++) + "]");
239     returned_rows.push_back(*std::move(row));
240   }
241   EXPECT_THAT(returned_rows,
242               UnorderedElementsAre(RowType(1, "test-fname-1", "test-lname-1"),
243                                    RowType(2, "test-fname-2", "test-lname-2")));
244 }
245 
246 /// @test Verify the basics of Commit().
TEST_F(ClientIntegrationTest,Commit)247 TEST_F(ClientIntegrationTest, Commit) {
248   // Insert SingerIds 100, 102, and 199.
249   auto isb =
250       InsertMutationBuilder("Singers", {"SingerId", "FirstName", "LastName"})
251           .EmplaceRow(100, "first-name-100", "last-name-100")
252           .EmplaceRow(102, "first-name-102", "last-name-102")
253           .EmplaceRow(199, "first-name-199", "last-name-199");
254   auto insert_result = client_->Commit(Mutations{isb.Build()});
255   EXPECT_STATUS_OK(insert_result);
256   EXPECT_NE(Timestamp{}, insert_result->commit_timestamp);
257 
258   // Delete SingerId 102.
259   auto delete_result = client_->Commit(
260       Mutations{MakeDeleteMutation("Singers", KeySet().AddKey(MakeKey(102)))});
261   EXPECT_STATUS_OK(delete_result);
262   EXPECT_LT(insert_result->commit_timestamp, delete_result->commit_timestamp);
263 
264   // Read SingerIds [100 ... 200).
265   using RowType = std::tuple<std::int64_t>;
266   std::vector<std::int64_t> ids;
267   auto ks = KeySet().AddRange(MakeKeyBoundClosed(100), MakeKeyBoundOpen(200));
268   auto rows = client_->Read("Singers", std::move(ks), {"SingerId"});
269   for (auto const& row : StreamOf<RowType>(rows)) {
270     EXPECT_STATUS_OK(row);
271     if (row) ids.push_back(std::get<0>(*row));
272   }
273   EXPECT_THAT(ids, UnorderedElementsAre(100, 199));
274 }
275 
276 /// @test Test various forms of ExecuteQuery() and ExecuteDml()
TEST_F(ClientIntegrationTest,ExecuteQueryDml)277 TEST_F(ClientIntegrationTest, ExecuteQueryDml) {
278   auto& client = *client_;
279   auto insert_result =
280       client_->Commit([&client](Transaction txn) -> StatusOr<Mutations> {
281         auto insert = client.ExecuteDml(
282             std::move(txn), SqlStatement(R"sql(
283         INSERT INTO Singers (SingerId, FirstName, LastName)
284         VALUES (@id, @fname, @lname))sql",
285                                          {{"id", Value(1)},
286                                           {"fname", Value("test-fname-1")},
287                                           {"lname", Value("test-lname-1")}}));
288         if (!insert) return std::move(insert).status();
289         return Mutations{};
290       });
291   EXPECT_STATUS_OK(insert_result);
292 
293   using RowType = std::tuple<std::int64_t, std::string, std::string>;
294   std::vector<RowType> expected_rows;
295   auto commit_result = client_->Commit(
296       [&client, &expected_rows](Transaction const& txn) -> StatusOr<Mutations> {
297         expected_rows.clear();
298         for (int i = 2; i != 10; ++i) {
299           auto s = std::to_string(i);
300           auto insert = client.ExecuteDml(
301               txn, SqlStatement(R"sql(
302         INSERT INTO Singers (SingerId, FirstName, LastName)
303         VALUES (@id, @fname, @lname))sql",
304                                 {{"id", Value(i)},
305                                  {"fname", Value("test-fname-" + s)},
306                                  {"lname", Value("test-lname-" + s)}}));
307           if (!insert) return std::move(insert).status();
308           expected_rows.emplace_back(
309               std::make_tuple(i, "test-fname-" + s, "test-lname-" + s));
310         }
311 
312         auto delete_result =
313             client.ExecuteDml(txn, SqlStatement(R"sql(
314         DELETE FROM Singers WHERE SingerId = @id)sql",
315                                                 {{"id", Value(1)}}));
316         if (!delete_result) return std::move(delete_result).status();
317 
318         return Mutations{};
319       });
320   ASSERT_STATUS_OK(commit_result);
321 
322   auto rows = client_->ExecuteQuery(
323       SqlStatement("SELECT SingerId, FirstName, LastName FROM Singers", {}));
324 
325   std::vector<RowType> actual_rows;
326   int row_number = 0;
327   for (auto& row : StreamOf<RowType>(rows)) {
328     EXPECT_STATUS_OK(row);
329     if (!row) break;
330     SCOPED_TRACE("Parsing row[" + std::to_string(row_number++) + "]");
331     actual_rows.push_back(*std::move(row));
332   }
333   EXPECT_THAT(actual_rows, UnorderedElementsAreArray(expected_rows));
334 }
335 
TEST_F(ClientIntegrationTest,QueryOptionsWork)336 TEST_F(ClientIntegrationTest, QueryOptionsWork) {
337   ASSERT_NO_FATAL_FAILURE(InsertTwoSingers());
338 
339   // An optimizer_version of "latest" should work.
340   auto rows = client_->ExecuteQuery(
341       SqlStatement("SELECT FirstName, LastName FROM Singers"),
342       QueryOptions().set_optimizer_version("latest"));
343   int row_count = 0;
344   for (auto const& row : rows) {
345     ASSERT_STATUS_OK(row);
346     ++row_count;
347   }
348   EXPECT_EQ(2, row_count);
349 
350   // An invalid optimizer_version should produce an error.
351   rows = client_->ExecuteQuery(
352       SqlStatement("SELECT FirstName, LastName FROM Singers"),
353       QueryOptions().set_optimizer_version("some-invalid-version"));
354   row_count = 0;
355   bool got_error = false;
356   for (auto const& row : rows) {
357     if (!row) {
358       got_error = true;
359       break;
360     }
361     ++row_count;
362   }
363   if (!emulator_) {
364     EXPECT_TRUE(got_error) << "An invalid optimizer version should be an error";
365     EXPECT_EQ(0, row_count);
366   } else {
367     EXPECT_FALSE(got_error) << "An invalid optimizer version should be OK";
368     EXPECT_EQ(2, row_count);
369   }
370 }
371 
372 /// @test Test ExecutePartitionedDml
TEST_F(ClientIntegrationTest,ExecutePartitionedDml)373 TEST_F(ClientIntegrationTest, ExecutePartitionedDml) {
374   auto& client = *client_;
375   auto insert_result =
376       client_->Commit([&client](Transaction txn) -> StatusOr<Mutations> {
377         auto insert = client.ExecuteDml(
378             std::move(txn), SqlStatement(R"sql(
379         INSERT INTO Singers (SingerId, FirstName, LastName)
380         VALUES (@id, @fname, @lname))sql",
381                                          {{"id", Value(1)},
382                                           {"fname", Value("test-fname-1")},
383                                           {"lname", Value("test-lname-1")}}));
384         if (!insert) return std::move(insert).status();
385         return Mutations{};
386       });
387   EXPECT_STATUS_OK(insert_result);
388 
389   auto result = client_->ExecutePartitionedDml(
390       SqlStatement("UPDATE Singers SET LastName = 'test-only'"
391                    " WHERE SingerId >= 1"));
392   EXPECT_STATUS_OK(result);
393 }
394 
CheckReadWithOptions(Client client,std::function<Transaction::SingleUseOptions (CommitResult const &)> const & options_generator)395 void CheckReadWithOptions(
396     Client client,
397     std::function<Transaction::SingleUseOptions(CommitResult const&)> const&
398         options_generator) {
399   using RowValues = std::vector<Value>;
400   std::vector<RowValues> expected_rows;
401   auto commit = client.Commit(
402       [&expected_rows](Transaction const&) -> StatusOr<Mutations> {
403         expected_rows.clear();
404         InsertMutationBuilder insert("Singers",
405                                      {"SingerId", "FirstName", "LastName"});
406         for (int i = 1; i != 10; ++i) {
407           auto s = std::to_string(i);
408           auto row = RowValues{Value(i), Value("test-fname-" + s),
409                                Value("test-lname-" + s)};
410           insert.AddRow(row);
411           expected_rows.push_back(row);
412         }
413         return Mutations{std::move(insert).Build()};
414       });
415   ASSERT_STATUS_OK(commit);
416 
417   auto rows = client.Read(options_generator(*commit), "Singers", KeySet::All(),
418                           {"SingerId", "FirstName", "LastName"});
419 
420   std::vector<RowValues> actual_rows;
421   int row_number = 0;
422   for (auto& row :
423        StreamOf<std::tuple<std::int64_t, std::string, std::string>>(rows)) {
424     SCOPED_TRACE("Reading row[" + std::to_string(row_number++) + "]");
425     EXPECT_STATUS_OK(row);
426     if (!row) break;
427     std::vector<Value> v;
428     v.emplace_back(std::get<0>(*row));
429     v.emplace_back(std::get<1>(*row));
430     v.emplace_back(std::get<2>(*row));
431     actual_rows.push_back(std::move(v));
432   }
433   EXPECT_THAT(actual_rows, UnorderedElementsAreArray(expected_rows));
434 }
435 
436 /// @test Test Read() with bounded staleness set by a timestamp.
TEST_F(ClientIntegrationTest,ReadBoundedStalenessTimestamp)437 TEST_F(ClientIntegrationTest, ReadBoundedStalenessTimestamp) {
438   CheckReadWithOptions(*client_, [](CommitResult const& result) {
439     return Transaction::SingleUseOptions(
440         /*min_read_timestamp=*/result.commit_timestamp);
441   });
442 }
443 
444 /// @test Test Read() with bounded staleness set by duration.
TEST_F(ClientIntegrationTest,ReadBoundedStalenessDuration)445 TEST_F(ClientIntegrationTest, ReadBoundedStalenessDuration) {
446   CheckReadWithOptions(*client_, [](CommitResult const&) {
447     // We want a duration sufficiently recent to include the latest commit.
448     return Transaction::SingleUseOptions(
449         /*max_staleness=*/std::chrono::nanoseconds(1));
450   });
451 }
452 
453 /// @test Test Read() with exact staleness set to "all previous transactions".
TEST_F(ClientIntegrationTest,ReadExactStalenessLatest)454 TEST_F(ClientIntegrationTest, ReadExactStalenessLatest) {
455   CheckReadWithOptions(*client_, [](CommitResult const&) {
456     return Transaction::SingleUseOptions(Transaction::ReadOnlyOptions());
457   });
458 }
459 
460 /// @test Test Read() with exact staleness set by a timestamp.
TEST_F(ClientIntegrationTest,ReadExactStalenessTimestamp)461 TEST_F(ClientIntegrationTest, ReadExactStalenessTimestamp) {
462   CheckReadWithOptions(*client_, [](CommitResult const& result) {
463     return Transaction::SingleUseOptions(Transaction::ReadOnlyOptions(
464         /*read_timestamp=*/result.commit_timestamp));
465   });
466 }
467 
468 /// @test Test Read() with exact staleness set by duration.
TEST_F(ClientIntegrationTest,ReadExactStalenessDuration)469 TEST_F(ClientIntegrationTest, ReadExactStalenessDuration) {
470   CheckReadWithOptions(*client_, [](CommitResult const&) {
471     return Transaction::SingleUseOptions(Transaction::ReadOnlyOptions(
472         /*exact_staleness=*/std::chrono::nanoseconds(0)));
473   });
474 }
475 
CheckExecuteQueryWithSingleUseOptions(Client client,std::function<Transaction::SingleUseOptions (CommitResult const &)> const & options_generator)476 void CheckExecuteQueryWithSingleUseOptions(
477     Client client,
478     std::function<Transaction::SingleUseOptions(CommitResult const&)> const&
479         options_generator) {
480   using RowValues = std::vector<Value>;
481   std::vector<RowValues> expected_rows;
482   auto commit = client.Commit(
483       [&expected_rows](Transaction const&) -> StatusOr<Mutations> {
484         InsertMutationBuilder insert("Singers",
485                                      {"SingerId", "FirstName", "LastName"});
486         expected_rows.clear();
487         for (int i = 1; i != 10; ++i) {
488           auto s = std::to_string(i);
489           auto row = RowValues{Value(i), Value("test-fname-" + s),
490                                Value("test-lname-" + s)};
491           insert.AddRow(row);
492           expected_rows.push_back(row);
493         }
494         return Mutations{std::move(insert).Build()};
495       });
496   ASSERT_STATUS_OK(commit);
497 
498   auto rows = client.ExecuteQuery(
499       options_generator(*commit),
500       SqlStatement("SELECT SingerId, FirstName, LastName FROM Singers"));
501 
502   std::vector<RowValues> actual_rows;
503   int row_number = 0;
504   for (auto& row :
505        StreamOf<std::tuple<std::int64_t, std::string, std::string>>(rows)) {
506     SCOPED_TRACE("Reading row[" + std::to_string(row_number++) + "]");
507     EXPECT_STATUS_OK(row);
508     if (!row) break;
509     std::vector<Value> v;
510     v.emplace_back(std::get<0>(*row));
511     v.emplace_back(std::get<1>(*row));
512     v.emplace_back(std::get<2>(*row));
513     actual_rows.push_back(std::move(v));
514   }
515 
516   EXPECT_THAT(actual_rows, UnorderedElementsAreArray(expected_rows));
517 }
518 
519 /// @test Test ExecuteQuery() with bounded staleness set by a timestamp.
TEST_F(ClientIntegrationTest,ExecuteQueryBoundedStalenessTimestamp)520 TEST_F(ClientIntegrationTest, ExecuteQueryBoundedStalenessTimestamp) {
521   CheckExecuteQueryWithSingleUseOptions(
522       *client_, [](CommitResult const& result) {
523         return Transaction::SingleUseOptions(
524             /*min_read_timestamp=*/result.commit_timestamp);
525       });
526 }
527 
528 /// @test Test ExecuteQuery() with bounded staleness set by duration.
TEST_F(ClientIntegrationTest,ExecuteQueryBoundedStalenessDuration)529 TEST_F(ClientIntegrationTest, ExecuteQueryBoundedStalenessDuration) {
530   CheckExecuteQueryWithSingleUseOptions(*client_, [](CommitResult const&) {
531     // We want a duration sufficiently recent to include the latest commit.
532     return Transaction::SingleUseOptions(
533         /*max_staleness=*/std::chrono::nanoseconds(1));
534   });
535 }
536 
537 /// @test Test ExecuteQuery() with exact staleness set to "all previous
538 /// transactions".
TEST_F(ClientIntegrationTest,ExecuteQueryExactStalenessLatest)539 TEST_F(ClientIntegrationTest, ExecuteQueryExactStalenessLatest) {
540   CheckExecuteQueryWithSingleUseOptions(*client_, [](CommitResult const&) {
541     return Transaction::SingleUseOptions(Transaction::ReadOnlyOptions());
542   });
543 }
544 
545 /// @test Test ExecuteQuery() with exact staleness set by a timestamp.
TEST_F(ClientIntegrationTest,ExecuteQueryExactStalenessTimestamp)546 TEST_F(ClientIntegrationTest, ExecuteQueryExactStalenessTimestamp) {
547   CheckExecuteQueryWithSingleUseOptions(
548       *client_, [](CommitResult const& result) {
549         return Transaction::SingleUseOptions(Transaction::ReadOnlyOptions(
550             /*read_timestamp=*/result.commit_timestamp));
551       });
552 }
553 
554 /// @test Test ExecuteQuery() with exact staleness set by duration.
TEST_F(ClientIntegrationTest,ExecuteQueryExactStalenessDuration)555 TEST_F(ClientIntegrationTest, ExecuteQueryExactStalenessDuration) {
556   CheckExecuteQueryWithSingleUseOptions(*client_, [](CommitResult const&) {
557     return Transaction::SingleUseOptions(Transaction::ReadOnlyOptions(
558         /*exact_staleness=*/std::chrono::nanoseconds(0)));
559   });
560 }
561 
AddSingerDataToTable(Client client)562 StatusOr<std::vector<std::vector<Value>>> AddSingerDataToTable(Client client) {
563   std::vector<std::vector<Value>> expected_rows;
564   auto commit = client.Commit(
565       [&expected_rows](Transaction const&) -> StatusOr<Mutations> {
566         expected_rows.clear();
567         InsertMutationBuilder insert("Singers",
568                                      {"SingerId", "FirstName", "LastName"});
569         for (int i = 1; i != 10; ++i) {
570           auto s = std::to_string(i);
571           auto row = std::vector<Value>{Value(i), Value("test-fname-" + s),
572                                         Value("test-lname-" + s)};
573           insert.AddRow(row);
574           expected_rows.push_back(row);
575         }
576         return Mutations{std::move(insert).Build()};
577       });
578   if (!commit.ok()) {
579     return commit.status();
580   }
581   return expected_rows;
582 }
583 
TEST_F(ClientIntegrationTest,PartitionRead)584 TEST_F(ClientIntegrationTest, PartitionRead) {
585   auto expected_rows = AddSingerDataToTable(*client_);
586   ASSERT_STATUS_OK(expected_rows);
587 
588   auto ro_transaction = MakeReadOnlyTransaction();
589   auto read_partitions =
590       client_->PartitionRead(ro_transaction, "Singers", KeySet::All(),
591                              {"SingerId", "FirstName", "LastName"});
592   ASSERT_STATUS_OK(read_partitions);
593 
594   std::vector<std::string> serialized_partitions;
595   for (auto const& partition : *read_partitions) {
596     auto serialized_partition = SerializeReadPartition(partition);
597     ASSERT_STATUS_OK(serialized_partition);
598     serialized_partitions.push_back(*serialized_partition);
599   }
600 
601   std::vector<std::vector<Value>> actual_rows;
602   int partition_number = 0;
603   for (auto const& partition : serialized_partitions) {
604     int row_number = 0;
605     auto deserialized_partition = DeserializeReadPartition(partition);
606     ASSERT_STATUS_OK(deserialized_partition);
607     auto rows = client_->Read(*deserialized_partition);
608     for (auto& row :
609          StreamOf<std::tuple<std::int64_t, std::string, std::string>>(rows)) {
610       SCOPED_TRACE("Reading partition[" + std::to_string(partition_number++) +
611                    "] row[" + std::to_string(row_number++) + "]");
612       EXPECT_STATUS_OK(row);
613       if (!row) break;
614       std::vector<Value> v;
615       v.emplace_back(std::get<0>(*row));
616       v.emplace_back(std::get<1>(*row));
617       v.emplace_back(std::get<2>(*row));
618       actual_rows.push_back(std::move(v));
619     }
620   }
621 
622   EXPECT_THAT(actual_rows, UnorderedElementsAreArray(*expected_rows));
623 }
624 
TEST_F(ClientIntegrationTest,PartitionQuery)625 TEST_F(ClientIntegrationTest, PartitionQuery) {
626   auto expected_rows = AddSingerDataToTable(*client_);
627   ASSERT_STATUS_OK(expected_rows);
628 
629   auto ro_transaction = MakeReadOnlyTransaction();
630   auto query_partitions = client_->PartitionQuery(
631       ro_transaction,
632       SqlStatement("select SingerId, FirstName, LastName from Singers"));
633   ASSERT_STATUS_OK(query_partitions);
634 
635   std::vector<std::string> serialized_partitions;
636   for (auto const& partition : *query_partitions) {
637     auto serialized_partition = SerializeQueryPartition(partition);
638     ASSERT_STATUS_OK(serialized_partition);
639     serialized_partitions.push_back(*serialized_partition);
640   }
641 
642   std::vector<std::vector<Value>> actual_rows;
643   int partition_number = 0;
644   for (auto const& partition : serialized_partitions) {
645     int row_number = 0;
646     auto deserialized_partition = DeserializeQueryPartition(partition);
647     ASSERT_STATUS_OK(deserialized_partition);
648     auto rows = client_->ExecuteQuery(*deserialized_partition);
649     for (auto& row :
650          StreamOf<std::tuple<std::int64_t, std::string, std::string>>(rows)) {
651       SCOPED_TRACE("Reading partition[" + std::to_string(partition_number++) +
652                    "] row[" + std::to_string(row_number++) + "]");
653       EXPECT_STATUS_OK(row);
654       if (!row) break;
655       std::vector<Value> v;
656       v.emplace_back(std::get<0>(*row));
657       v.emplace_back(std::get<1>(*row));
658       v.emplace_back(std::get<2>(*row));
659       actual_rows.push_back(std::move(v));
660     }
661   }
662 
663   EXPECT_THAT(actual_rows, UnorderedElementsAreArray(*expected_rows));
664 }
665 
TEST_F(ClientIntegrationTest,ExecuteBatchDml)666 TEST_F(ClientIntegrationTest, ExecuteBatchDml) {
667   auto statements = {
668       SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
669                    "VALUES(1, 'Foo1', 'Bar1')"),
670       SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
671                    "VALUES(2, 'Foo2', 'Bar2')"),
672       SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
673                    "VALUES(3, 'Foo3', 'Bar3')"),
674       SqlStatement("UPDATE Singers SET FirstName = \"FOO\" "
675                    "WHERE FirstName = 'Foo1' or FirstName = 'Foo3'"),
676   };
677 
678   auto& client = *client_;
679   StatusOr<BatchDmlResult> batch_result;
680   auto commit_result =
681       client_->Commit([&client, &batch_result,
682                        &statements](Transaction txn) -> StatusOr<Mutations> {
683         batch_result = client.ExecuteBatchDml(std::move(txn), statements);
684         if (!batch_result) return batch_result.status();
685         if (!batch_result->status.ok()) return batch_result->status;
686         return Mutations{};
687       });
688 
689   ASSERT_STATUS_OK(commit_result);
690   ASSERT_STATUS_OK(batch_result);
691   ASSERT_STATUS_OK(batch_result->status);
692   ASSERT_EQ(batch_result->stats.size(), 4);
693   ASSERT_EQ(batch_result->stats[0].row_count, 1);
694   ASSERT_EQ(batch_result->stats[1].row_count, 1);
695   ASSERT_EQ(batch_result->stats[2].row_count, 1);
696   ASSERT_EQ(batch_result->stats[3].row_count, 2);
697 
698   auto rows = client_->ExecuteQuery(SqlStatement(
699       "SELECT SingerId, FirstName, LastName FROM Singers ORDER BY SingerId"));
700 
701   struct Expectation {
702     std::int64_t id;
703     std::string fname;
704     std::string lname;
705   };
706   auto expected = std::vector<Expectation>{
707       Expectation{1, "FOO", "Bar1"},
708       Expectation{2, "Foo2", "Bar2"},
709       Expectation{3, "FOO", "Bar3"},
710   };
711   std::size_t counter = 0;
712   for (auto const& row :
713        StreamOf<std::tuple<std::int64_t, std::string, std::string>>(rows)) {
714     ASSERT_STATUS_OK(row);
715     ASSERT_EQ(std::get<0>(*row), expected[counter].id);
716     ASSERT_EQ(std::get<1>(*row), expected[counter].fname);
717     ASSERT_EQ(std::get<2>(*row), expected[counter].lname);
718     ++counter;
719   }
720   ASSERT_EQ(counter, expected.size());
721 }
722 
TEST_F(ClientIntegrationTest,ExecuteBatchDmlMany)723 TEST_F(ClientIntegrationTest, ExecuteBatchDmlMany) {
724   std::vector<SqlStatement> v;
725   constexpr auto kBatchSize = 200;
726   for (int i = 0; i < kBatchSize; ++i) {
727     std::string const singer_id = std::to_string(i);
728     std::string const first_name = "Foo" + singer_id;
729     std::string const last_name = "Bar" + singer_id;
730     std::string insert =
731         "INSERT INTO Singers (SingerId, FirstName, LastName) Values(";
732     insert += singer_id + ", '";
733     insert += first_name + "', '";
734     insert += last_name + "')";
735     v.emplace_back(insert);
736   }
737 
738   std::vector<SqlStatement> left(v.begin(), v.begin() + v.size() / 2);
739   std::vector<SqlStatement> right(v.begin() + v.size() / 2, v.end());
740 
741   auto& client = *client_;
742   StatusOr<BatchDmlResult> batch_result_left;
743   StatusOr<BatchDmlResult> batch_result_right;
744   auto commit_result =
745       client_->Commit([&client, &batch_result_left, &batch_result_right, &left,
746                        &right](Transaction txn) -> StatusOr<Mutations> {
747         batch_result_left = client.ExecuteBatchDml(txn, left);
748         if (!batch_result_left) return batch_result_left.status();
749         if (!batch_result_left->status.ok()) return batch_result_left->status;
750 
751         batch_result_right = client.ExecuteBatchDml(std::move(txn), right);
752         if (!batch_result_right) return batch_result_right.status();
753         if (!batch_result_right->status.ok()) return batch_result_right->status;
754 
755         return Mutations{};
756       });
757 
758   ASSERT_STATUS_OK(commit_result);
759 
760   ASSERT_STATUS_OK(batch_result_left);
761   EXPECT_EQ(batch_result_left->stats.size(), left.size());
762   EXPECT_STATUS_OK(batch_result_left->status);
763   for (auto const& stats : batch_result_left->stats) {
764     ASSERT_EQ(stats.row_count, 1);
765   }
766 
767   ASSERT_STATUS_OK(batch_result_right);
768   EXPECT_EQ(batch_result_right->stats.size(), right.size());
769   EXPECT_STATUS_OK(batch_result_right->status);
770   for (auto const& stats : batch_result_right->stats) {
771     ASSERT_EQ(stats.row_count, 1);
772   }
773 
774   auto rows = client_->ExecuteQuery(SqlStatement(
775       "SELECT SingerId, FirstName, LastName FROM Singers ORDER BY SingerId"));
776 
777   auto counter = 0;
778   for (auto const& row :
779        StreamOf<std::tuple<std::int64_t, std::string, std::string>>(rows)) {
780     ASSERT_STATUS_OK(row);
781     std::string const singer_id = std::to_string(counter);
782     std::string const first_name = "Foo" + singer_id;
783     std::string const last_name = "Bar" + singer_id;
784     ASSERT_EQ(std::get<0>(*row), counter);
785     ASSERT_EQ(std::get<1>(*row), first_name);
786     ASSERT_EQ(std::get<2>(*row), last_name);
787     ++counter;
788   }
789 
790   ASSERT_EQ(counter, kBatchSize);
791 }
792 
TEST_F(ClientIntegrationTest,ExecuteBatchDmlFailure)793 TEST_F(ClientIntegrationTest, ExecuteBatchDmlFailure) {
794   auto statements = {
795       SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
796                    "VALUES(1, 'Foo1', 'Bar1')"),
797       SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) "
798                    "VALUES(2, 'Foo2', 'Bar2')"),
799       SqlStatement("INSERT OOPS SYNTAX ERROR"),
800       SqlStatement("UPDATE Singers SET FirstName = 'FOO' "
801                    "WHERE FirstName = 'Foo1' or FirstName = 'Foo3'"),
802   };
803 
804   auto& client = *client_;
805   StatusOr<BatchDmlResult> batch_result;
806   auto commit_result =
807       client_->Commit([&client, &batch_result,
808                        &statements](Transaction txn) -> StatusOr<Mutations> {
809         batch_result = client.ExecuteBatchDml(std::move(txn), statements);
810         if (!batch_result) return batch_result.status();
811         if (!batch_result->status.ok()) return batch_result->status;
812         return Mutations{};
813       });
814 
815   ASSERT_FALSE(commit_result.ok());
816   ASSERT_STATUS_OK(batch_result);
817   ASSERT_FALSE(batch_result->status.ok());
818   ASSERT_EQ(batch_result->stats.size(), 2);
819   ASSERT_EQ(batch_result->stats[0].row_count, 1);
820   ASSERT_EQ(batch_result->stats[1].row_count, 1);
821 }
822 
TEST_F(ClientIntegrationTest,AnalyzeSql)823 TEST_F(ClientIntegrationTest, AnalyzeSql) {
824   auto txn = MakeReadOnlyTransaction();
825   auto sql = SqlStatement(
826       "SELECT * FROM Singers  "
827       "WHERE FirstName = 'Foo1' OR FirstName = 'Foo3'");
828 
829   // This returns a ExecutionPlan without executing the query.
830   auto plan = client_->AnalyzeSql(std::move(txn), std::move(sql));
831   if (!EmulatorUnimplemented(plan.status())) {
832     ASSERT_STATUS_OK(plan);
833     EXPECT_GT(plan->plan_nodes_size(), 0);
834   }
835 }
836 
TEST_F(ClientIntegrationTest,ProfileQuery)837 TEST_F(ClientIntegrationTest, ProfileQuery) {
838   auto txn = MakeReadOnlyTransaction();
839   auto sql = SqlStatement("SELECT * FROM Singers");
840 
841   auto rows = client_->ProfileQuery(std::move(txn), std::move(sql));
842   // Consume all the rows to make the profile info available.
843   for (auto const& row : rows) {
844     ASSERT_STATUS_OK(row);
845   }
846 
847   auto stats = rows.ExecutionStats();
848   EXPECT_TRUE(stats);
849   EXPECT_GT(stats->size(), 0);
850 
851   auto plan = rows.ExecutionPlan();
852   if (!emulator_ || plan) {
853     EXPECT_TRUE(plan);
854     EXPECT_GT(plan->plan_nodes_size(), 0);
855   }
856 }
857 
TEST_F(ClientIntegrationTest,ProfileDml)858 TEST_F(ClientIntegrationTest, ProfileDml) {
859   auto& client = *client_;
860   ProfileDmlResult profile_result;
861   auto commit_result = client_->Commit(
862       [&client, &profile_result](Transaction txn) -> StatusOr<Mutations> {
863         auto sql = SqlStatement(
864             "INSERT INTO Singers (SingerId, FirstName, LastName) "
865             "VALUES(1, 'Foo1', 'Bar1')");
866         auto dml_profile = client.ProfileDml(std::move(txn), std::move(sql));
867         if (!dml_profile) return dml_profile.status();
868         profile_result = std::move(*dml_profile);
869         return Mutations{};
870       });
871   ASSERT_STATUS_OK(commit_result);
872 
873   EXPECT_EQ(1, profile_result.RowsModified());
874 
875   auto stats = profile_result.ExecutionStats();
876   EXPECT_TRUE(stats);
877   EXPECT_GT(stats->size(), 0);
878 
879   auto plan = profile_result.ExecutionPlan();
880   if (!emulator_ || plan) {
881     EXPECT_TRUE(plan);
882     EXPECT_GT(plan->plan_nodes_size(), 0);
883   }
884 }
885 
886 }  // namespace
887 }  // namespace SPANNER_CLIENT_NS
888 }  // namespace spanner
889 }  // namespace cloud
890 }  // namespace google
891