1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/dbi/hiveserver2/service.h"
19 
20 #include <thrift/protocol/TBinaryProtocol.h>
21 #include <thrift/transport/TSocket.h>
22 #include <thrift/transport/TTransportUtils.h>
23 #include <sstream>
24 
25 #include "arrow/dbi/hiveserver2/session.h"
26 #include "arrow/dbi/hiveserver2/thrift_internal.h"
27 
28 #include "arrow/dbi/hiveserver2/ImpalaHiveServer2Service.h"
29 #include "arrow/dbi/hiveserver2/TCLIService.h"
30 
31 #include "arrow/status.h"
32 #include "arrow/util/logging.h"
33 
34 namespace hs2 = apache::hive::service::cli::thrift;
35 
36 using apache::thrift::TException;
37 using apache::thrift::protocol::TBinaryProtocol;
38 using apache::thrift::protocol::TProtocol;
39 using apache::thrift::transport::TBufferedTransport;
40 using apache::thrift::transport::TSocket;
41 using apache::thrift::transport::TTransport;
42 
43 namespace arrow {
44 namespace hiveserver2 {
45 
46 struct Service::ServiceImpl {
47   hs2::TProtocolVersion::type protocol_version;
48   std::shared_ptr<TSocket> socket;
49   std::shared_ptr<TTransport> transport;
50   std::shared_ptr<TProtocol> protocol;
51 };
52 
Connect(const std::string & host,int port,int conn_timeout,ProtocolVersion protocol_version,std::unique_ptr<Service> * service)53 Status Service::Connect(const std::string& host, int port, int conn_timeout,
54                         ProtocolVersion protocol_version,
55                         std::unique_ptr<Service>* service) {
56   service->reset(new Service(host, port, conn_timeout, protocol_version));
57   return (*service)->Open();
58 }
59 
~Service()60 Service::~Service() { DCHECK(!IsConnected()); }
61 
Close()62 Status Service::Close() {
63   if (!IsConnected()) return Status::OK();
64   TRY_RPC_OR_RETURN(impl_->transport->close());
65   return Status::OK();
66 }
67 
IsConnected() const68 bool Service::IsConnected() const {
69   return impl_->transport && impl_->transport->isOpen();
70 }
71 
SetRecvTimeout(int timeout)72 void Service::SetRecvTimeout(int timeout) { impl_->socket->setRecvTimeout(timeout); }
73 
SetSendTimeout(int timeout)74 void Service::SetSendTimeout(int timeout) { impl_->socket->setSendTimeout(timeout); }
75 
OpenSession(const std::string & user,const HS2ClientConfig & config,std::unique_ptr<Session> * session) const76 Status Service::OpenSession(const std::string& user, const HS2ClientConfig& config,
77                             std::unique_ptr<Session>* session) const {
78   session->reset(new Session(rpc_));
79   return (*session)->Open(config, user);
80 }
81 
Service(const std::string & host,int port,int conn_timeout,ProtocolVersion protocol_version)82 Service::Service(const std::string& host, int port, int conn_timeout,
83                  ProtocolVersion protocol_version)
84     : host_(host),
85       port_(port),
86       conn_timeout_(conn_timeout),
87       impl_(new ServiceImpl()),
88       rpc_(new ThriftRPC()) {
89   impl_->protocol_version = ProtocolVersionToTProtocolVersion(protocol_version);
90 }
91 
Open()92 Status Service::Open() {
93   if (impl_->protocol_version < hs2::TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
94     return Status::NotImplemented("Unsupported protocol: ", impl_->protocol_version);
95   }
96 
97   impl_->socket.reset(new TSocket(host_, port_));
98   impl_->socket->setConnTimeout(conn_timeout_);
99   impl_->transport.reset(new TBufferedTransport(impl_->socket));
100   impl_->protocol.reset(new TBinaryProtocol(impl_->transport));
101 
102   rpc_->client.reset(new impala::ImpalaHiveServer2ServiceClient(impl_->protocol));
103 
104   TRY_RPC_OR_RETURN(impl_->transport->open());
105 
106   return Status::OK();
107 }
108 
109 }  // namespace hiveserver2
110 }  // namespace arrow
111