1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/bigquery/internal/storage_stub.h"
16 #include "google/cloud/bigquery/connection.h"
17 #include "google/cloud/bigquery/connection_options.h"
18 #include "google/cloud/bigquery/internal/stream_reader.h"
19 #include "google/cloud/bigquery/version.h"
20 #include "google/cloud/grpc_error_delegate.h"
21 #include "google/cloud/status_or.h"
22 #include "absl/types/optional.h"
23 #include <google/cloud/bigquery/storage/v1beta1/storage.grpc.pb.h>
24 #include <google/cloud/bigquery/storage/v1beta1/storage.pb.h>
25 #include <grpcpp/create_channel.h>
26 #include <memory>
27
28 namespace google {
29 namespace cloud {
30 namespace bigquery {
31 inline namespace BIGQUERY_CLIENT_NS {
32 namespace internal {
33
34 namespace {
35
36 constexpr auto kRoutingHeader = "x-goog-request-params";
37
38 namespace bigquerystorage_proto = ::google::cloud::bigquery::storage::v1beta1;
39
40 using ::google::cloud::MakeStatusFromRpcError;
41 using ::google::cloud::StatusOr;
42
43 // An implementation of StreamReader for gRPC unary-streaming methods.
44 template <class T>
45 class GrpcStreamReader : public StreamReader<T> {
46 public:
GrpcStreamReader(std::unique_ptr<grpc::ClientContext> context,std::unique_ptr<grpc::ClientReaderInterface<T>> reader)47 GrpcStreamReader(std::unique_ptr<grpc::ClientContext> context,
48 std::unique_ptr<grpc::ClientReaderInterface<T>> reader)
49 : context_(std::move(context)), reader_(std::move(reader)) {}
50
NextValue()51 StatusOr<absl::optional<T>> NextValue() override {
52 T t;
53 if (reader_->Read(&t)) {
54 return absl::optional<T>(t);
55 }
56 grpc::Status grpc_status = reader_->Finish();
57 if (!grpc_status.ok()) {
58 return MakeStatusFromRpcError(grpc_status);
59 }
60 return absl::optional<T>();
61 }
62
63 private:
64 std::unique_ptr<grpc::ClientContext> context_;
65 std::unique_ptr<grpc::ClientReaderInterface<T>> reader_;
66 };
67
68 class DefaultStorageStub : public StorageStub {
69 public:
DefaultStorageStub(std::unique_ptr<bigquerystorage_proto::BigQueryStorage::StubInterface> grpc_stub)70 explicit DefaultStorageStub(
71 std::unique_ptr<bigquerystorage_proto::BigQueryStorage::StubInterface>
72 grpc_stub)
73 : grpc_stub_(std::move(grpc_stub)) {}
74
75 google::cloud::StatusOr<bigquerystorage_proto::ReadSession> CreateReadSession(
76 bigquerystorage_proto::CreateReadSessionRequest const& request) override;
77
78 std::unique_ptr<StreamReader<bigquerystorage_proto::ReadRowsResponse>>
79 ReadRows(bigquerystorage_proto::ReadRowsRequest const& request) override;
80
81 private:
82 std::unique_ptr<bigquerystorage_proto::BigQueryStorage::StubInterface>
83 grpc_stub_;
84 };
85
86 google::cloud::StatusOr<bigquerystorage_proto::ReadSession>
CreateReadSession(bigquerystorage_proto::CreateReadSessionRequest const & request)87 DefaultStorageStub::CreateReadSession(
88 bigquerystorage_proto::CreateReadSessionRequest const& request) {
89 bigquerystorage_proto::ReadSession response;
90 grpc::ClientContext client_context;
91
92 // For performance reasons, the Google routing layer does not parse
93 // request messages. As such, we must hoist the values required for
94 // routing into a header.
95 //
96 // TODO(aryann): Replace the below string concatenations with
97 // absl::Substitute.
98 //
99 // TODO(aryann): URL escape the project ID and dataset ID before
100 // placing them into the routing header.
101 std::string routing_header = "table_reference.project_id=";
102 routing_header += request.table_reference().project_id();
103 routing_header += "&table_reference.dataset_id=";
104 routing_header += request.table_reference().dataset_id();
105 client_context.AddMetadata(kRoutingHeader, routing_header);
106
107 grpc::Status grpc_status =
108 grpc_stub_->CreateReadSession(&client_context, request, &response);
109 if (!grpc_status.ok()) {
110 return MakeStatusFromRpcError(grpc_status);
111 }
112 return response;
113 }
114
115 std::unique_ptr<StreamReader<bigquerystorage_proto::ReadRowsResponse>>
ReadRows(bigquerystorage_proto::ReadRowsRequest const & request)116 DefaultStorageStub::ReadRows(
117 bigquerystorage_proto::ReadRowsRequest const& request) {
118 // TODO(aryann): Replace this with `absl::make_unique`.
119 auto client_context =
120 std::unique_ptr<grpc::ClientContext>(new grpc::ClientContext);
121
122 // TODO(aryann): Replace the below string concatenations with
123 // absl::Substitute.
124 //
125 // TODO(aryann): URL escape the project ID and dataset ID before
126 // placing them into the routing header.
127 std::string routing_header = "read_position.stream.name=";
128 routing_header += request.read_position().stream().name();
129 client_context->AddMetadata(kRoutingHeader, routing_header);
130
131 auto stream = grpc_stub_->ReadRows(client_context.get(), request);
132 // TODO(aryann): Replace this with `absl::make_unique`.
133 return std::unique_ptr<StreamReader<bigquerystorage_proto::ReadRowsResponse>>(
134 new GrpcStreamReader<bigquerystorage_proto::ReadRowsResponse>(
135 std::move(client_context), std::move(stream)));
136 }
137
138 } // namespace
139
MakeDefaultStorageStub(ConnectionOptions const & options)140 std::shared_ptr<StorageStub> MakeDefaultStorageStub(
141 ConnectionOptions const& options) {
142 auto grpc_stub =
143 bigquerystorage_proto::BigQueryStorage::NewStub(grpc::CreateCustomChannel(
144 options.bigquerystorage_endpoint(), options.credentials(),
145 options.CreateChannelArguments()));
146
147 return std::make_shared<DefaultStorageStub>(std::move(grpc_stub));
148 }
149
150 } // namespace internal
151 } // namespace BIGQUERY_CLIENT_NS
152 } // namespace bigquery
153 } // namespace cloud
154 } // namespace google
155