1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/dbi/hiveserver2/service.h"
19
20 #include <thrift/protocol/TBinaryProtocol.h>
21 #include <thrift/transport/TSocket.h>
22 #include <thrift/transport/TTransportUtils.h>
23 #include <sstream>
24
25 #include "arrow/dbi/hiveserver2/session.h"
26 #include "arrow/dbi/hiveserver2/thrift_internal.h"
27
28 #include "arrow/dbi/hiveserver2/ImpalaHiveServer2Service.h"
29 #include "arrow/dbi/hiveserver2/TCLIService.h"
30
31 #include "arrow/status.h"
32 #include "arrow/util/logging.h"
33
34 namespace hs2 = apache::hive::service::cli::thrift;
35
36 using apache::thrift::TException;
37 using apache::thrift::protocol::TBinaryProtocol;
38 using apache::thrift::protocol::TProtocol;
39 using apache::thrift::transport::TBufferedTransport;
40 using apache::thrift::transport::TSocket;
41 using apache::thrift::transport::TTransport;
42
43 namespace arrow {
44 namespace hiveserver2 {
45
46 struct Service::ServiceImpl {
47 hs2::TProtocolVersion::type protocol_version;
48 std::shared_ptr<TSocket> socket;
49 std::shared_ptr<TTransport> transport;
50 std::shared_ptr<TProtocol> protocol;
51 };
52
Connect(const std::string & host,int port,int conn_timeout,ProtocolVersion protocol_version,std::unique_ptr<Service> * service)53 Status Service::Connect(const std::string& host, int port, int conn_timeout,
54 ProtocolVersion protocol_version,
55 std::unique_ptr<Service>* service) {
56 service->reset(new Service(host, port, conn_timeout, protocol_version));
57 return (*service)->Open();
58 }
59
~Service()60 Service::~Service() { DCHECK(!IsConnected()); }
61
Close()62 Status Service::Close() {
63 if (!IsConnected()) return Status::OK();
64 TRY_RPC_OR_RETURN(impl_->transport->close());
65 return Status::OK();
66 }
67
IsConnected() const68 bool Service::IsConnected() const {
69 return impl_->transport && impl_->transport->isOpen();
70 }
71
SetRecvTimeout(int timeout)72 void Service::SetRecvTimeout(int timeout) { impl_->socket->setRecvTimeout(timeout); }
73
SetSendTimeout(int timeout)74 void Service::SetSendTimeout(int timeout) { impl_->socket->setSendTimeout(timeout); }
75
OpenSession(const std::string & user,const HS2ClientConfig & config,std::unique_ptr<Session> * session) const76 Status Service::OpenSession(const std::string& user, const HS2ClientConfig& config,
77 std::unique_ptr<Session>* session) const {
78 session->reset(new Session(rpc_));
79 return (*session)->Open(config, user);
80 }
81
Service(const std::string & host,int port,int conn_timeout,ProtocolVersion protocol_version)82 Service::Service(const std::string& host, int port, int conn_timeout,
83 ProtocolVersion protocol_version)
84 : host_(host),
85 port_(port),
86 conn_timeout_(conn_timeout),
87 impl_(new ServiceImpl()),
88 rpc_(new ThriftRPC()) {
89 impl_->protocol_version = ProtocolVersionToTProtocolVersion(protocol_version);
90 }
91
Open()92 Status Service::Open() {
93 if (impl_->protocol_version < hs2::TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
94 return Status::NotImplemented("Unsupported protocol: ", impl_->protocol_version);
95 }
96
97 impl_->socket.reset(new TSocket(host_, port_));
98 impl_->socket->setConnTimeout(conn_timeout_);
99 impl_->transport.reset(new TBufferedTransport(impl_->socket));
100 impl_->protocol.reset(new TBinaryProtocol(impl_->transport));
101
102 rpc_->client.reset(new impala::ImpalaHiveServer2ServiceClient(impl_->protocol));
103
104 TRY_RPC_OR_RETURN(impl_->transport->open());
105
106 return Status::OK();
107 }
108
109 } // namespace hiveserver2
110 } // namespace arrow
111