1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "cloud_secure_connection_config.hpp"
18
19 #include "cluster.hpp"
20 #include "cluster_metadata_resolver.hpp"
21 #include "config.hpp"
22 #include "dse_auth.hpp"
23 #include "http_client.hpp"
24 #include "json.hpp"
25 #include "logger.hpp"
26 #include "ssl.hpp"
27 #include "utils.hpp"
28
29 using namespace datastax;
30 using namespace datastax::internal;
31 using namespace datastax::internal::core;
32
33 #define CLOUD_ERROR "Unable to load cloud secure connection configuration: "
34 #define METADATA_SERVER_ERROR "Unable to configure driver from metadata server: "
35
36 // Pinned to v1 because that's what the driver currently handles.
37 #define METADATA_SERVER_PATH "/metadata?version=1"
38
39 #define METADATA_SERVER_PORT 30443
40 #define RESPONSE_BODY_TRUNCATE_LENGTH 1024
41
42 #ifdef HAVE_ZLIB
43 #include "unzip.h"
44
45 #define CONFIGURATION_FILE "config.json"
46 #define CERTIFICATE_AUTHORITY_FILE "ca.crt"
47 #define CERTIFICATE_FILE "cert"
48 #define KEY_FILE "key"
49
50 class UnzipFile {
51 public:
UnzipFile()52 UnzipFile()
53 : file(NULL) {}
54
~UnzipFile()55 ~UnzipFile() { unzClose(file); }
56
open(const String & filename)57 bool open(const String& filename) { return (file = unzOpen(filename.c_str())) != NULL; }
58
read_contents(const String & filename,String * contents)59 bool read_contents(const String& filename, String* contents) {
60 int rc = unzLocateFile(file, filename.c_str(), 0);
61 if (rc != UNZ_OK) {
62 return false;
63 }
64
65 rc = unzOpenCurrentFile(file);
66 if (rc != UNZ_OK) {
67 return false;
68 }
69
70 unz_file_info file_info;
71 rc = unzGetCurrentFileInfo(file, &file_info, 0, 0, 0, 0, 0, 0);
72 if (rc != UNZ_OK) {
73 unzCloseCurrentFile(file);
74 return false;
75 }
76
77 contents->resize(file_info.uncompressed_size, 0);
78 unzReadCurrentFile(file, &(*contents)[0], contents->size());
79 unzCloseCurrentFile(file);
80
81 return true;
82 }
83
84 private:
85 unzFile file;
86 };
87 #endif
88
89 namespace {
90
91 class CloudClusterMetadataResolver : public ClusterMetadataResolver {
92 public:
CloudClusterMetadataResolver(const String & host,int port,const SocketSettings & settings,uint64_t request_timeout_ms)93 CloudClusterMetadataResolver(const String& host, int port, const SocketSettings& settings,
94 uint64_t request_timeout_ms)
95 : client_(new HttpClient(Address(host, port), METADATA_SERVER_PATH,
96 bind_callback(&CloudClusterMetadataResolver::on_response, this))) {
97 client_->with_settings(settings)->with_request_timeout_ms(request_timeout_ms);
98 }
99
100 private:
internal_resolve(uv_loop_t * loop,const AddressVec & contact_points)101 virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) {
102 inc_ref();
103 client_->request(loop);
104 }
105
internal_cancel()106 virtual void internal_cancel() { client_->cancel(); }
107
108 private:
on_response(HttpClient * http_client)109 void on_response(HttpClient* http_client) {
110 if (http_client->is_ok()) {
111 if (http_client->content_type().find("json") != std::string::npos) {
112 parse_metadata(http_client->response_body());
113 } else {
114 LOG_ERROR(METADATA_SERVER_ERROR "Invalid response content type: '%s'",
115 http_client->content_type().c_str());
116 }
117 } else if (!http_client->is_canceled()) {
118 if (http_client->is_error_status_code()) {
119 String error_message =
120 http_client->response_body().substr(0, RESPONSE_BODY_TRUNCATE_LENGTH);
121 if (http_client->content_type().find("json") != std::string::npos) {
122 json::Document document;
123 document.Parse(http_client->response_body().c_str());
124 if (document.IsObject() && document.HasMember("message") &&
125 document["message"].IsString()) {
126 error_message = document["message"].GetString();
127 }
128 }
129 LOG_ERROR(METADATA_SERVER_ERROR "Returned error response code %u: '%s'",
130 http_client->status_code(), error_message.c_str());
131 } else {
132 LOG_ERROR(METADATA_SERVER_ERROR "%s", http_client->error_message().c_str());
133 }
134 }
135
136 callback_(this);
137 dec_ref();
138 }
139
parse_metadata(const String & response_body)140 void parse_metadata(const String& response_body) {
141 json::Document document;
142 document.Parse(response_body.c_str());
143
144 if (!document.IsObject()) {
145 LOG_ERROR(METADATA_SERVER_ERROR "Metadata JSON is invalid");
146 return;
147 }
148
149 if (!document.HasMember("contact_info") || !document["contact_info"].IsObject()) {
150 LOG_ERROR(METADATA_SERVER_ERROR "Contact information is not available");
151 return;
152 }
153
154 const json::Value& contact_info = document["contact_info"];
155
156 if (!contact_info.HasMember("local_dc") || !contact_info["local_dc"].IsString()) {
157 LOG_ERROR(METADATA_SERVER_ERROR "Local DC is not available");
158 return;
159 }
160
161 local_dc_ = contact_info["local_dc"].GetString();
162
163 if (!contact_info.HasMember("sni_proxy_address") ||
164 !contact_info["sni_proxy_address"].IsString()) {
165 LOG_ERROR(METADATA_SERVER_ERROR "SNI proxy address is not available");
166 return;
167 }
168
169 int sni_port = METADATA_SERVER_PORT;
170 Vector<String> tokens;
171 explode(contact_info["sni_proxy_address"].GetString(), tokens, ':');
172 String sni_address = tokens[0];
173 if (tokens.size() == 2) {
174 IStringStream ss(tokens[1]);
175 if ((ss >> sni_port).fail()) {
176 LOG_WARN(METADATA_SERVER_ERROR "Invalid port, default %d will be used",
177 METADATA_SERVER_PORT);
178 }
179 }
180
181 if (!contact_info.HasMember("contact_points") || !contact_info["contact_points"].IsArray()) {
182 LOG_ERROR(METADATA_SERVER_ERROR "Contact points are not available");
183 return;
184 }
185
186 const json::Value& contact_points = contact_info["contact_points"];
187 for (rapidjson::SizeType i = 0; i < contact_points.Size(); ++i) {
188 if (contact_points[i].IsString()) {
189 String host_id = contact_points[i].GetString();
190 resolved_contact_points_.push_back(Address(sni_address, sni_port, host_id));
191 }
192 }
193 }
194
195 private:
196 HttpClient::Ptr client_;
197 };
198
199 class CloudClusterMetadataResolverFactory : public ClusterMetadataResolverFactory {
200 public:
CloudClusterMetadataResolverFactory(const String & host,int port)201 CloudClusterMetadataResolverFactory(const String& host, int port)
202 : host_(host)
203 , port_(port) {}
204
new_instance(const ClusterSettings & settings) const205 virtual ClusterMetadataResolver::Ptr new_instance(const ClusterSettings& settings) const {
206 return ClusterMetadataResolver::Ptr(new CloudClusterMetadataResolver(
207 host_, port_, settings.control_connection_settings.connection_settings.socket_settings,
208 settings.control_connection_settings.connection_settings.connect_timeout_ms));
209 }
210
name() const211 virtual const char* name() const { return "Cloud"; }
212
213 private:
214 String host_;
215 int port_;
216 };
217
218 } // namespace
219
CloudSecureConnectionConfig()220 CloudSecureConnectionConfig::CloudSecureConnectionConfig()
221 : is_loaded_(false)
222 , port_(0) {}
223
load(const String & filename,Config * config)224 bool CloudSecureConnectionConfig::load(const String& filename, Config* config /* = NULL */) {
225 #ifndef HAVE_ZLIB
226 LOG_ERROR(CLOUD_ERROR "Driver was not built with zlib support");
227 return false;
228 #else
229 UnzipFile zip_file;
230 if (!zip_file.open(filename.c_str())) {
231 LOG_ERROR(CLOUD_ERROR "Unable to open zip file %s; file does not exist or is invalid",
232 filename.c_str());
233 return false;
234 }
235
236 String contents;
237 if (!zip_file.read_contents(CONFIGURATION_FILE, &contents)) {
238 LOG_ERROR(CLOUD_ERROR "Missing configuration file %s", CONFIGURATION_FILE);
239 return false;
240 }
241
242 json::MemoryStream memory_stream(contents.c_str(), contents.size());
243 json::AutoUTFMemoryInputStream auto_utf_stream(memory_stream);
244 json::Document document;
245 document.ParseStream(auto_utf_stream);
246 if (!document.IsObject()) {
247 LOG_ERROR(CLOUD_ERROR "Invalid configuration");
248 return false;
249 }
250
251 if (document.HasMember("username") && document["username"].IsString()) {
252 username_ = document["username"].GetString();
253 }
254 if (document.HasMember("password") && document["password"].IsString()) {
255 password_ = document["password"].GetString();
256 }
257
258 if (config && (!username_.empty() || !password_.empty())) {
259 config->set_auth_provider(
260 AuthProvider::Ptr(new enterprise::DsePlainTextAuthProvider(username_, password_, "")));
261 }
262
263 if (!document.HasMember("host") || !document["host"].IsString()) {
264 LOG_ERROR(CLOUD_ERROR "Missing host");
265 return false;
266 }
267 if (!document.HasMember("port") || !document["port"].IsInt()) {
268 LOG_ERROR(CLOUD_ERROR "Missing port");
269 return false;
270 }
271 host_ = document["host"].GetString();
272 port_ = document["port"].GetInt();
273
274 if (!zip_file.read_contents(CERTIFICATE_AUTHORITY_FILE, &ca_cert_)) {
275 LOG_ERROR(CLOUD_ERROR "Missing certificate authority file %s", CERTIFICATE_AUTHORITY_FILE);
276 return false;
277 }
278
279 if (!zip_file.read_contents(CERTIFICATE_FILE, &cert_)) {
280 LOG_ERROR(CLOUD_ERROR "Missing certificate file %s", CERTIFICATE_FILE);
281 return false;
282 }
283
284 if (!zip_file.read_contents(KEY_FILE, &key_)) {
285 LOG_ERROR(CLOUD_ERROR "Missing key file %s", KEY_FILE);
286 return false;
287 }
288
289 if (config) {
290 SslContext::Ptr ssl_context(SslContextFactory::create());
291
292 ssl_context->set_verify_flags(CASS_SSL_VERIFY_PEER_CERT | CASS_SSL_VERIFY_PEER_IDENTITY_DNS);
293
294 if (ssl_context->add_trusted_cert(ca_cert_.c_str(), ca_cert_.length()) != CASS_OK) {
295 LOG_ERROR(CLOUD_ERROR "Invalid CA certificate %s", CERTIFICATE_AUTHORITY_FILE);
296 return false;
297 }
298
299 if (ssl_context->set_cert(cert_.c_str(), cert_.length()) != CASS_OK) {
300 LOG_ERROR(CLOUD_ERROR "Invalid client certificate %s", CERTIFICATE_FILE);
301 return false;
302 }
303
304 if (ssl_context->set_private_key(key_.c_str(), key_.length(), NULL, 0) != CASS_OK) {
305 LOG_ERROR(CLOUD_ERROR "Invalid client private key %s", KEY_FILE);
306 return false;
307 }
308
309 config->set_ssl_context(ssl_context);
310
311 config->set_cluster_metadata_resolver_factory(
312 ClusterMetadataResolverFactory::Ptr(new CloudClusterMetadataResolverFactory(host_, port_)));
313 }
314
315 is_loaded_ = true;
316 return true;
317 #endif
318 }
319