1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "cloud_secure_connection_config.hpp"
18 
19 #include "cluster.hpp"
20 #include "cluster_metadata_resolver.hpp"
21 #include "config.hpp"
22 #include "dse_auth.hpp"
23 #include "http_client.hpp"
24 #include "json.hpp"
25 #include "logger.hpp"
26 #include "ssl.hpp"
27 #include "utils.hpp"
28 
29 using namespace datastax;
30 using namespace datastax::internal;
31 using namespace datastax::internal::core;
32 
33 #define CLOUD_ERROR "Unable to load cloud secure connection configuration: "
34 #define METADATA_SERVER_ERROR "Unable to configure driver from metadata server: "
35 
36 // Pinned to v1 because that's what the driver currently handles.
37 #define METADATA_SERVER_PATH "/metadata?version=1"
38 
39 #define METADATA_SERVER_PORT 30443
40 #define RESPONSE_BODY_TRUNCATE_LENGTH 1024
41 
42 #ifdef HAVE_ZLIB
43 #include "unzip.h"
44 
45 #define CONFIGURATION_FILE "config.json"
46 #define CERTIFICATE_AUTHORITY_FILE "ca.crt"
47 #define CERTIFICATE_FILE "cert"
48 #define KEY_FILE "key"
49 
50 class UnzipFile {
51 public:
UnzipFile()52   UnzipFile()
53       : file(NULL) {}
54 
~UnzipFile()55   ~UnzipFile() { unzClose(file); }
56 
open(const String & filename)57   bool open(const String& filename) { return (file = unzOpen(filename.c_str())) != NULL; }
58 
read_contents(const String & filename,String * contents)59   bool read_contents(const String& filename, String* contents) {
60     int rc = unzLocateFile(file, filename.c_str(), 0);
61     if (rc != UNZ_OK) {
62       return false;
63     }
64 
65     rc = unzOpenCurrentFile(file);
66     if (rc != UNZ_OK) {
67       return false;
68     }
69 
70     unz_file_info file_info;
71     rc = unzGetCurrentFileInfo(file, &file_info, 0, 0, 0, 0, 0, 0);
72     if (rc != UNZ_OK) {
73       unzCloseCurrentFile(file);
74       return false;
75     }
76 
77     contents->resize(file_info.uncompressed_size, 0);
78     unzReadCurrentFile(file, &(*contents)[0], contents->size());
79     unzCloseCurrentFile(file);
80 
81     return true;
82   }
83 
84 private:
85   unzFile file;
86 };
87 #endif
88 
89 namespace {
90 
91 class CloudClusterMetadataResolver : public ClusterMetadataResolver {
92 public:
CloudClusterMetadataResolver(const String & host,int port,const SocketSettings & settings,uint64_t request_timeout_ms)93   CloudClusterMetadataResolver(const String& host, int port, const SocketSettings& settings,
94                                uint64_t request_timeout_ms)
95       : client_(new HttpClient(Address(host, port), METADATA_SERVER_PATH,
96                                bind_callback(&CloudClusterMetadataResolver::on_response, this))) {
97     client_->with_settings(settings)->with_request_timeout_ms(request_timeout_ms);
98   }
99 
100 private:
internal_resolve(uv_loop_t * loop,const AddressVec & contact_points)101   virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) {
102     inc_ref();
103     client_->request(loop);
104   }
105 
internal_cancel()106   virtual void internal_cancel() { client_->cancel(); }
107 
108 private:
on_response(HttpClient * http_client)109   void on_response(HttpClient* http_client) {
110     if (http_client->is_ok()) {
111       if (http_client->content_type().find("json") != std::string::npos) {
112         parse_metadata(http_client->response_body());
113       } else {
114         LOG_ERROR(METADATA_SERVER_ERROR "Invalid response content type: '%s'",
115                   http_client->content_type().c_str());
116       }
117     } else if (!http_client->is_canceled()) {
118       if (http_client->is_error_status_code()) {
119         String error_message =
120             http_client->response_body().substr(0, RESPONSE_BODY_TRUNCATE_LENGTH);
121         if (http_client->content_type().find("json") != std::string::npos) {
122           json::Document document;
123           document.Parse(http_client->response_body().c_str());
124           if (document.IsObject() && document.HasMember("message") &&
125               document["message"].IsString()) {
126             error_message = document["message"].GetString();
127           }
128         }
129         LOG_ERROR(METADATA_SERVER_ERROR "Returned error response code %u: '%s'",
130                   http_client->status_code(), error_message.c_str());
131       } else {
132         LOG_ERROR(METADATA_SERVER_ERROR "%s", http_client->error_message().c_str());
133       }
134     }
135 
136     callback_(this);
137     dec_ref();
138   }
139 
parse_metadata(const String & response_body)140   void parse_metadata(const String& response_body) {
141     json::Document document;
142     document.Parse(response_body.c_str());
143 
144     if (!document.IsObject()) {
145       LOG_ERROR(METADATA_SERVER_ERROR "Metadata JSON is invalid");
146       return;
147     }
148 
149     if (!document.HasMember("contact_info") || !document["contact_info"].IsObject()) {
150       LOG_ERROR(METADATA_SERVER_ERROR "Contact information is not available");
151       return;
152     }
153 
154     const json::Value& contact_info = document["contact_info"];
155 
156     if (!contact_info.HasMember("local_dc") || !contact_info["local_dc"].IsString()) {
157       LOG_ERROR(METADATA_SERVER_ERROR "Local DC is not available");
158       return;
159     }
160 
161     local_dc_ = contact_info["local_dc"].GetString();
162 
163     if (!contact_info.HasMember("sni_proxy_address") ||
164         !contact_info["sni_proxy_address"].IsString()) {
165       LOG_ERROR(METADATA_SERVER_ERROR "SNI proxy address is not available");
166       return;
167     }
168 
169     int sni_port = METADATA_SERVER_PORT;
170     Vector<String> tokens;
171     explode(contact_info["sni_proxy_address"].GetString(), tokens, ':');
172     String sni_address = tokens[0];
173     if (tokens.size() == 2) {
174       IStringStream ss(tokens[1]);
175       if ((ss >> sni_port).fail()) {
176         LOG_WARN(METADATA_SERVER_ERROR "Invalid port, default %d will be used",
177                  METADATA_SERVER_PORT);
178       }
179     }
180 
181     if (!contact_info.HasMember("contact_points") || !contact_info["contact_points"].IsArray()) {
182       LOG_ERROR(METADATA_SERVER_ERROR "Contact points are not available");
183       return;
184     }
185 
186     const json::Value& contact_points = contact_info["contact_points"];
187     for (rapidjson::SizeType i = 0; i < contact_points.Size(); ++i) {
188       if (contact_points[i].IsString()) {
189         String host_id = contact_points[i].GetString();
190         resolved_contact_points_.push_back(Address(sni_address, sni_port, host_id));
191       }
192     }
193   }
194 
195 private:
196   HttpClient::Ptr client_;
197 };
198 
199 class CloudClusterMetadataResolverFactory : public ClusterMetadataResolverFactory {
200 public:
CloudClusterMetadataResolverFactory(const String & host,int port)201   CloudClusterMetadataResolverFactory(const String& host, int port)
202       : host_(host)
203       , port_(port) {}
204 
new_instance(const ClusterSettings & settings) const205   virtual ClusterMetadataResolver::Ptr new_instance(const ClusterSettings& settings) const {
206     return ClusterMetadataResolver::Ptr(new CloudClusterMetadataResolver(
207         host_, port_, settings.control_connection_settings.connection_settings.socket_settings,
208         settings.control_connection_settings.connection_settings.connect_timeout_ms));
209   }
210 
name() const211   virtual const char* name() const { return "Cloud"; }
212 
213 private:
214   String host_;
215   int port_;
216 };
217 
218 } // namespace
219 
CloudSecureConnectionConfig()220 CloudSecureConnectionConfig::CloudSecureConnectionConfig()
221     : is_loaded_(false)
222     , port_(0) {}
223 
load(const String & filename,Config * config)224 bool CloudSecureConnectionConfig::load(const String& filename, Config* config /* = NULL */) {
225 #ifndef HAVE_ZLIB
226   LOG_ERROR(CLOUD_ERROR "Driver was not built with zlib support");
227   return false;
228 #else
229   UnzipFile zip_file;
230   if (!zip_file.open(filename.c_str())) {
231     LOG_ERROR(CLOUD_ERROR "Unable to open zip file %s; file does not exist or is invalid",
232               filename.c_str());
233     return false;
234   }
235 
236   String contents;
237   if (!zip_file.read_contents(CONFIGURATION_FILE, &contents)) {
238     LOG_ERROR(CLOUD_ERROR "Missing configuration file %s", CONFIGURATION_FILE);
239     return false;
240   }
241 
242   json::MemoryStream memory_stream(contents.c_str(), contents.size());
243   json::AutoUTFMemoryInputStream auto_utf_stream(memory_stream);
244   json::Document document;
245   document.ParseStream(auto_utf_stream);
246   if (!document.IsObject()) {
247     LOG_ERROR(CLOUD_ERROR "Invalid configuration");
248     return false;
249   }
250 
251   if (document.HasMember("username") && document["username"].IsString()) {
252     username_ = document["username"].GetString();
253   }
254   if (document.HasMember("password") && document["password"].IsString()) {
255     password_ = document["password"].GetString();
256   }
257 
258   if (config && (!username_.empty() || !password_.empty())) {
259     config->set_auth_provider(
260         AuthProvider::Ptr(new enterprise::DsePlainTextAuthProvider(username_, password_, "")));
261   }
262 
263   if (!document.HasMember("host") || !document["host"].IsString()) {
264     LOG_ERROR(CLOUD_ERROR "Missing host");
265     return false;
266   }
267   if (!document.HasMember("port") || !document["port"].IsInt()) {
268     LOG_ERROR(CLOUD_ERROR "Missing port");
269     return false;
270   }
271   host_ = document["host"].GetString();
272   port_ = document["port"].GetInt();
273 
274   if (!zip_file.read_contents(CERTIFICATE_AUTHORITY_FILE, &ca_cert_)) {
275     LOG_ERROR(CLOUD_ERROR "Missing certificate authority file %s", CERTIFICATE_AUTHORITY_FILE);
276     return false;
277   }
278 
279   if (!zip_file.read_contents(CERTIFICATE_FILE, &cert_)) {
280     LOG_ERROR(CLOUD_ERROR "Missing certificate file %s", CERTIFICATE_FILE);
281     return false;
282   }
283 
284   if (!zip_file.read_contents(KEY_FILE, &key_)) {
285     LOG_ERROR(CLOUD_ERROR "Missing key file %s", KEY_FILE);
286     return false;
287   }
288 
289   if (config) {
290     SslContext::Ptr ssl_context(SslContextFactory::create());
291 
292     ssl_context->set_verify_flags(CASS_SSL_VERIFY_PEER_CERT | CASS_SSL_VERIFY_PEER_IDENTITY_DNS);
293 
294     if (ssl_context->add_trusted_cert(ca_cert_.c_str(), ca_cert_.length()) != CASS_OK) {
295       LOG_ERROR(CLOUD_ERROR "Invalid CA certificate %s", CERTIFICATE_AUTHORITY_FILE);
296       return false;
297     }
298 
299     if (ssl_context->set_cert(cert_.c_str(), cert_.length()) != CASS_OK) {
300       LOG_ERROR(CLOUD_ERROR "Invalid client certificate %s", CERTIFICATE_FILE);
301       return false;
302     }
303 
304     if (ssl_context->set_private_key(key_.c_str(), key_.length(), NULL, 0) != CASS_OK) {
305       LOG_ERROR(CLOUD_ERROR "Invalid client private key %s", KEY_FILE);
306       return false;
307     }
308 
309     config->set_ssl_context(ssl_context);
310 
311     config->set_cluster_metadata_resolver_factory(
312         ClusterMetadataResolverFactory::Ptr(new CloudClusterMetadataResolverFactory(host_, port_)));
313   }
314 
315   is_loaded_ = true;
316   return true;
317 #endif
318 }
319