1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigquery/internal/storage_stub.h"
16 #include "google/cloud/bigquery/connection.h"
17 #include "google/cloud/bigquery/connection_options.h"
18 #include "google/cloud/bigquery/internal/stream_reader.h"
19 #include "google/cloud/bigquery/version.h"
20 #include "google/cloud/grpc_error_delegate.h"
21 #include "google/cloud/status_or.h"
22 #include "absl/types/optional.h"
23 #include <google/cloud/bigquery/storage/v1beta1/storage.grpc.pb.h>
24 #include <google/cloud/bigquery/storage/v1beta1/storage.pb.h>
25 #include <grpcpp/create_channel.h>
26 #include <memory>
27 
28 namespace google {
29 namespace cloud {
30 namespace bigquery {
31 inline namespace BIGQUERY_CLIENT_NS {
32 namespace internal {
33 
34 namespace {
35 
36 constexpr auto kRoutingHeader = "x-goog-request-params";
37 
38 namespace bigquerystorage_proto = ::google::cloud::bigquery::storage::v1beta1;
39 
40 using ::google::cloud::MakeStatusFromRpcError;
41 using ::google::cloud::StatusOr;
42 
43 // An implementation of StreamReader for gRPC unary-streaming methods.
44 template <class T>
45 class GrpcStreamReader : public StreamReader<T> {
46  public:
GrpcStreamReader(std::unique_ptr<grpc::ClientContext> context,std::unique_ptr<grpc::ClientReaderInterface<T>> reader)47   GrpcStreamReader(std::unique_ptr<grpc::ClientContext> context,
48                    std::unique_ptr<grpc::ClientReaderInterface<T>> reader)
49       : context_(std::move(context)), reader_(std::move(reader)) {}
50 
NextValue()51   StatusOr<absl::optional<T>> NextValue() override {
52     T t;
53     if (reader_->Read(&t)) {
54       return absl::optional<T>(t);
55     }
56     grpc::Status grpc_status = reader_->Finish();
57     if (!grpc_status.ok()) {
58       return MakeStatusFromRpcError(grpc_status);
59     }
60     return absl::optional<T>();
61   }
62 
63  private:
64   std::unique_ptr<grpc::ClientContext> context_;
65   std::unique_ptr<grpc::ClientReaderInterface<T>> reader_;
66 };
67 
68 class DefaultStorageStub : public StorageStub {
69  public:
DefaultStorageStub(std::unique_ptr<bigquerystorage_proto::BigQueryStorage::StubInterface> grpc_stub)70   explicit DefaultStorageStub(
71       std::unique_ptr<bigquerystorage_proto::BigQueryStorage::StubInterface>
72           grpc_stub)
73       : grpc_stub_(std::move(grpc_stub)) {}
74 
75   google::cloud::StatusOr<bigquerystorage_proto::ReadSession> CreateReadSession(
76       bigquerystorage_proto::CreateReadSessionRequest const& request) override;
77 
78   std::unique_ptr<StreamReader<bigquerystorage_proto::ReadRowsResponse>>
79   ReadRows(bigquerystorage_proto::ReadRowsRequest const& request) override;
80 
81  private:
82   std::unique_ptr<bigquerystorage_proto::BigQueryStorage::StubInterface>
83       grpc_stub_;
84 };
85 
86 google::cloud::StatusOr<bigquerystorage_proto::ReadSession>
CreateReadSession(bigquerystorage_proto::CreateReadSessionRequest const & request)87 DefaultStorageStub::CreateReadSession(
88     bigquerystorage_proto::CreateReadSessionRequest const& request) {
89   bigquerystorage_proto::ReadSession response;
90   grpc::ClientContext client_context;
91 
92   // For performance reasons, the Google routing layer does not parse
93   // request messages. As such, we must hoist the values required for
94   // routing into a header.
95   //
96   // TODO(aryann): Replace the below string concatenations with
97   // absl::Substitute.
98   //
99   // TODO(aryann): URL escape the project ID and dataset ID before
100   // placing them into the routing header.
101   std::string routing_header = "table_reference.project_id=";
102   routing_header += request.table_reference().project_id();
103   routing_header += "&table_reference.dataset_id=";
104   routing_header += request.table_reference().dataset_id();
105   client_context.AddMetadata(kRoutingHeader, routing_header);
106 
107   grpc::Status grpc_status =
108       grpc_stub_->CreateReadSession(&client_context, request, &response);
109   if (!grpc_status.ok()) {
110     return MakeStatusFromRpcError(grpc_status);
111   }
112   return response;
113 }
114 
115 std::unique_ptr<StreamReader<bigquerystorage_proto::ReadRowsResponse>>
ReadRows(bigquerystorage_proto::ReadRowsRequest const & request)116 DefaultStorageStub::ReadRows(
117     bigquerystorage_proto::ReadRowsRequest const& request) {
118   // TODO(aryann): Replace this with `absl::make_unique`.
119   auto client_context =
120       std::unique_ptr<grpc::ClientContext>(new grpc::ClientContext);
121 
122   // TODO(aryann): Replace the below string concatenations with
123   // absl::Substitute.
124   //
125   // TODO(aryann): URL escape the project ID and dataset ID before
126   // placing them into the routing header.
127   std::string routing_header = "read_position.stream.name=";
128   routing_header += request.read_position().stream().name();
129   client_context->AddMetadata(kRoutingHeader, routing_header);
130 
131   auto stream = grpc_stub_->ReadRows(client_context.get(), request);
132   // TODO(aryann): Replace this with `absl::make_unique`.
133   return std::unique_ptr<StreamReader<bigquerystorage_proto::ReadRowsResponse>>(
134       new GrpcStreamReader<bigquerystorage_proto::ReadRowsResponse>(
135           std::move(client_context), std::move(stream)));
136 }
137 
138 }  // namespace
139 
MakeDefaultStorageStub(ConnectionOptions const & options)140 std::shared_ptr<StorageStub> MakeDefaultStorageStub(
141     ConnectionOptions const& options) {
142   auto grpc_stub =
143       bigquerystorage_proto::BigQueryStorage::NewStub(grpc::CreateCustomChannel(
144           options.bigquerystorage_endpoint(), options.credentials(),
145           options.CreateChannelArguments()));
146 
147   return std::make_shared<DefaultStorageStub>(std::move(grpc_stub));
148 }
149 
150 }  // namespace internal
151 }  // namespace BIGQUERY_CLIENT_NS
152 }  // namespace bigquery
153 }  // namespace cloud
154 }  // namespace google
155