1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "metadata.hpp"
18 
19 #include "buffer.hpp"
20 #include "collection.hpp"
21 #include "collection_iterator.hpp"
22 #include "data_type_parser.hpp"
23 #include "external.hpp"
24 #include "iterator.hpp"
25 #include "json.hpp"
26 #include "logger.hpp"
27 #include "map_iterator.hpp"
28 #include "result_iterator.hpp"
29 #include "row.hpp"
30 #include "row_iterator.hpp"
31 #include "scoped_lock.hpp"
32 #include "string.hpp"
33 #include "utils.hpp"
34 #include "value.hpp"
35 
36 #include <algorithm>
37 #include <cmath>
38 #include <ctype.h>
39 #include <iterator>
40 
41 using namespace datastax;
42 using namespace datastax::internal;
43 using namespace datastax::internal::core;
44 
append_arguments(String & full_name,const String & arguments)45 static String& append_arguments(String& full_name, const String& arguments) {
46   full_name.push_back('(');
47   bool first = true;
48   IStringStream stream(arguments);
49   while (!stream.eof()) {
50     String argument;
51     std::getline(stream, argument, ',');
52     // Remove white-space
53     argument.erase(std::remove_if(argument.begin(), argument.end(), ::isspace), argument.end());
54     if (!argument.empty()) {
55       if (!first) full_name.push_back(',');
56       full_name.append(argument);
57       first = false;
58     }
59   }
60   full_name.push_back(')');
61   return full_name;
62 }
63 
64 extern "C" {
65 
cass_schema_meta_free(const CassSchemaMeta * schema_meta)66 void cass_schema_meta_free(const CassSchemaMeta* schema_meta) { delete schema_meta->from(); }
67 
cass_schema_meta_snapshot_version(const CassSchemaMeta * schema_meta)68 cass_uint32_t cass_schema_meta_snapshot_version(const CassSchemaMeta* schema_meta) {
69   return schema_meta->version();
70 }
71 
cass_schema_meta_version(const CassSchemaMeta * schema_meta)72 CassVersion cass_schema_meta_version(const CassSchemaMeta* schema_meta) {
73   CassVersion version;
74   version.major_version = schema_meta->server_version().major_version();
75   version.minor_version = schema_meta->server_version().minor_version();
76   version.patch_version = schema_meta->server_version().patch_version();
77   return version;
78 }
79 
cass_schema_meta_keyspace_by_name(const CassSchemaMeta * schema_meta,const char * keyspace)80 const CassKeyspaceMeta* cass_schema_meta_keyspace_by_name(const CassSchemaMeta* schema_meta,
81                                                           const char* keyspace) {
82   return cass_schema_meta_keyspace_by_name_n(schema_meta, keyspace, SAFE_STRLEN(keyspace));
83 }
84 
cass_schema_meta_keyspace_by_name_n(const CassSchemaMeta * schema_meta,const char * keyspace,size_t keyspace_length)85 const CassKeyspaceMeta* cass_schema_meta_keyspace_by_name_n(const CassSchemaMeta* schema_meta,
86                                                             const char* keyspace,
87                                                             size_t keyspace_length) {
88   return CassKeyspaceMeta::to(schema_meta->get_keyspace(String(keyspace, keyspace_length)));
89 }
90 
cass_keyspace_meta_name(const CassKeyspaceMeta * keyspace_meta,const char ** name,size_t * name_length)91 void cass_keyspace_meta_name(const CassKeyspaceMeta* keyspace_meta, const char** name,
92                              size_t* name_length) {
93   *name = keyspace_meta->name().data();
94   *name_length = keyspace_meta->name().size();
95 }
96 
cass_keyspace_meta_is_virtual(const CassKeyspaceMeta * keyspace_meta)97 cass_bool_t cass_keyspace_meta_is_virtual(const CassKeyspaceMeta* keyspace_meta) {
98   return keyspace_meta->is_virtual() ? cass_true : cass_false;
99 }
100 
cass_keyspace_meta_table_by_name(const CassKeyspaceMeta * keyspace_meta,const char * table)101 const CassTableMeta* cass_keyspace_meta_table_by_name(const CassKeyspaceMeta* keyspace_meta,
102                                                       const char* table) {
103   return cass_keyspace_meta_table_by_name_n(keyspace_meta, table, SAFE_STRLEN(table));
104 }
105 
cass_keyspace_meta_table_by_name_n(const CassKeyspaceMeta * keyspace_meta,const char * table,size_t table_length)106 const CassTableMeta* cass_keyspace_meta_table_by_name_n(const CassKeyspaceMeta* keyspace_meta,
107                                                         const char* table, size_t table_length) {
108 
109   return CassTableMeta::to(keyspace_meta->get_table(String(table, table_length)));
110 }
111 
112 const CassMaterializedViewMeta*
cass_keyspace_meta_materialized_view_by_name(const CassKeyspaceMeta * keyspace_meta,const char * view)113 cass_keyspace_meta_materialized_view_by_name(const CassKeyspaceMeta* keyspace_meta,
114                                              const char* view) {
115   return cass_keyspace_meta_materialized_view_by_name_n(keyspace_meta, view, SAFE_STRLEN(view));
116 }
117 
118 const CassMaterializedViewMeta*
cass_keyspace_meta_materialized_view_by_name_n(const CassKeyspaceMeta * keyspace_meta,const char * view,size_t view_length)119 cass_keyspace_meta_materialized_view_by_name_n(const CassKeyspaceMeta* keyspace_meta,
120                                                const char* view, size_t view_length) {
121 
122   return CassMaterializedViewMeta::to(keyspace_meta->get_view(String(view, view_length)));
123 }
124 
cass_keyspace_meta_user_type_by_name(const CassKeyspaceMeta * keyspace_meta,const char * type)125 const CassDataType* cass_keyspace_meta_user_type_by_name(const CassKeyspaceMeta* keyspace_meta,
126                                                          const char* type) {
127   return cass_keyspace_meta_user_type_by_name_n(keyspace_meta, type, SAFE_STRLEN(type));
128 }
129 
cass_keyspace_meta_user_type_by_name_n(const CassKeyspaceMeta * keyspace_meta,const char * type,size_t type_length)130 const CassDataType* cass_keyspace_meta_user_type_by_name_n(const CassKeyspaceMeta* keyspace_meta,
131                                                            const char* type, size_t type_length) {
132   return CassDataType::to(keyspace_meta->get_user_type(String(type, type_length)));
133 }
134 
cass_keyspace_meta_function_by_name(const CassKeyspaceMeta * keyspace_meta,const char * name,const char * arguments)135 const CassFunctionMeta* cass_keyspace_meta_function_by_name(const CassKeyspaceMeta* keyspace_meta,
136                                                             const char* name,
137                                                             const char* arguments) {
138   return cass_keyspace_meta_function_by_name_n(keyspace_meta, name, SAFE_STRLEN(name), arguments,
139                                                SAFE_STRLEN(arguments));
140 }
141 
cass_keyspace_meta_function_by_name_n(const CassKeyspaceMeta * keyspace_meta,const char * name,size_t name_length,const char * arguments,size_t arguments_length)142 const CassFunctionMeta* cass_keyspace_meta_function_by_name_n(const CassKeyspaceMeta* keyspace_meta,
143                                                               const char* name, size_t name_length,
144                                                               const char* arguments,
145                                                               size_t arguments_length) {
146   String full_function_name(name, name_length);
147   return CassFunctionMeta::to(keyspace_meta->get_function(
148       append_arguments(full_function_name, String(arguments, arguments_length))));
149 }
150 
cass_keyspace_meta_aggregate_by_name(const CassKeyspaceMeta * keyspace_meta,const char * name,const char * arguments)151 const CassAggregateMeta* cass_keyspace_meta_aggregate_by_name(const CassKeyspaceMeta* keyspace_meta,
152                                                               const char* name,
153                                                               const char* arguments) {
154   return cass_keyspace_meta_aggregate_by_name_n(keyspace_meta, name, SAFE_STRLEN(name), arguments,
155                                                 SAFE_STRLEN(arguments));
156 }
157 
158 const CassAggregateMeta*
cass_keyspace_meta_aggregate_by_name_n(const CassKeyspaceMeta * keyspace_meta,const char * name,size_t name_length,const char * arguments,size_t arguments_length)159 cass_keyspace_meta_aggregate_by_name_n(const CassKeyspaceMeta* keyspace_meta, const char* name,
160                                        size_t name_length, const char* arguments,
161                                        size_t arguments_length) {
162   String full_aggregate_name(name, name_length);
163   return CassAggregateMeta::to(keyspace_meta->get_aggregate(
164       append_arguments(full_aggregate_name, String(arguments, arguments_length))));
165 }
166 
cass_keyspace_meta_field_by_name(const CassKeyspaceMeta * keyspace_meta,const char * name)167 const CassValue* cass_keyspace_meta_field_by_name(const CassKeyspaceMeta* keyspace_meta,
168                                                   const char* name) {
169   return cass_keyspace_meta_field_by_name_n(keyspace_meta, name, SAFE_STRLEN(name));
170 }
171 
cass_keyspace_meta_field_by_name_n(const CassKeyspaceMeta * keyspace_meta,const char * name,size_t name_length)172 const CassValue* cass_keyspace_meta_field_by_name_n(const CassKeyspaceMeta* keyspace_meta,
173                                                     const char* name, size_t name_length) {
174   return CassValue::to(keyspace_meta->get_field(String(name, name_length)));
175 }
176 
cass_table_meta_name(const CassTableMeta * table_meta,const char ** name,size_t * name_length)177 void cass_table_meta_name(const CassTableMeta* table_meta, const char** name, size_t* name_length) {
178   *name = table_meta->name().data();
179   *name_length = table_meta->name().size();
180 }
181 
cass_table_meta_is_virtual(const CassTableMeta * table_meta)182 cass_bool_t cass_table_meta_is_virtual(const CassTableMeta* table_meta) {
183   return table_meta->is_virtual() ? cass_true : cass_false;
184 }
185 
cass_table_meta_column_by_name(const CassTableMeta * table_meta,const char * column)186 const CassColumnMeta* cass_table_meta_column_by_name(const CassTableMeta* table_meta,
187                                                      const char* column) {
188   return cass_table_meta_column_by_name_n(table_meta, column, SAFE_STRLEN(column));
189 }
190 
cass_table_meta_column_by_name_n(const CassTableMeta * table_meta,const char * column,size_t column_length)191 const CassColumnMeta* cass_table_meta_column_by_name_n(const CassTableMeta* table_meta,
192                                                        const char* column, size_t column_length) {
193   return CassColumnMeta::to(table_meta->get_column(String(column, column_length)));
194 }
195 
cass_table_meta_column_count(const CassTableMeta * table_meta)196 size_t cass_table_meta_column_count(const CassTableMeta* table_meta) {
197   return table_meta->columns().size();
198 }
199 
cass_table_meta_column(const CassTableMeta * table_meta,size_t index)200 const CassColumnMeta* cass_table_meta_column(const CassTableMeta* table_meta, size_t index) {
201   if (index >= table_meta->columns().size()) {
202     return NULL;
203   }
204   return CassColumnMeta::to(table_meta->columns()[index].get());
205 }
206 
cass_table_meta_index_by_name(const CassTableMeta * table_meta,const char * index)207 const CassIndexMeta* cass_table_meta_index_by_name(const CassTableMeta* table_meta,
208                                                    const char* index) {
209   return cass_table_meta_index_by_name_n(table_meta, index, SAFE_STRLEN(index));
210 }
211 
cass_table_meta_index_by_name_n(const CassTableMeta * table_meta,const char * index,size_t index_length)212 const CassIndexMeta* cass_table_meta_index_by_name_n(const CassTableMeta* table_meta,
213                                                      const char* index, size_t index_length) {
214   return CassIndexMeta::to(table_meta->get_index(String(index, index_length)));
215 }
216 
cass_table_meta_index_count(const CassTableMeta * table_meta)217 size_t cass_table_meta_index_count(const CassTableMeta* table_meta) {
218   return table_meta->indexes().size();
219 }
220 
cass_table_meta_index(const CassTableMeta * table_meta,size_t index)221 const CassIndexMeta* cass_table_meta_index(const CassTableMeta* table_meta, size_t index) {
222   if (index >= table_meta->indexes().size()) {
223     return NULL;
224   }
225   return CassIndexMeta::to(table_meta->indexes()[index].get());
226 }
227 
228 const CassMaterializedViewMeta*
cass_table_meta_materialized_view_by_name(const CassTableMeta * table_meta,const char * view)229 cass_table_meta_materialized_view_by_name(const CassTableMeta* table_meta, const char* view) {
230   return cass_table_meta_materialized_view_by_name_n(table_meta, view, SAFE_STRLEN(view));
231 }
232 
233 const CassMaterializedViewMeta*
cass_table_meta_materialized_view_by_name_n(const CassTableMeta * table_meta,const char * view,size_t view_length)234 cass_table_meta_materialized_view_by_name_n(const CassTableMeta* table_meta, const char* view,
235                                             size_t view_length) {
236   return CassMaterializedViewMeta::to(table_meta->get_view(String(view, view_length)));
237 }
238 
cass_table_meta_materialized_view_count(const CassTableMeta * table_meta)239 size_t cass_table_meta_materialized_view_count(const CassTableMeta* table_meta) {
240   return table_meta->views().size();
241 }
242 
cass_table_meta_materialized_view(const CassTableMeta * table_meta,size_t index)243 const CassMaterializedViewMeta* cass_table_meta_materialized_view(const CassTableMeta* table_meta,
244                                                                   size_t index) {
245   if (index >= table_meta->views().size()) {
246     return NULL;
247   }
248   return CassMaterializedViewMeta::to(table_meta->views()[index].get());
249 }
250 
cass_table_meta_partition_key_count(const CassTableMeta * table_meta)251 size_t cass_table_meta_partition_key_count(const CassTableMeta* table_meta) {
252   return table_meta->partition_key().size();
253 }
254 
cass_table_meta_partition_key(const CassTableMeta * table_meta,size_t index)255 const CassColumnMeta* cass_table_meta_partition_key(const CassTableMeta* table_meta, size_t index) {
256   if (index >= table_meta->partition_key().size()) {
257     return NULL;
258   }
259   return CassColumnMeta::to(table_meta->partition_key()[index].get());
260 }
261 
cass_table_meta_clustering_key_count(const CassTableMeta * table_meta)262 size_t cass_table_meta_clustering_key_count(const CassTableMeta* table_meta) {
263   return table_meta->clustering_key().size();
264 }
265 
cass_table_meta_clustering_key(const CassTableMeta * table_meta,size_t index)266 const CassColumnMeta* cass_table_meta_clustering_key(const CassTableMeta* table_meta,
267                                                      size_t index) {
268   if (index >= table_meta->clustering_key().size()) {
269     return NULL;
270   }
271   return CassColumnMeta::to(table_meta->clustering_key()[index].get());
272 }
273 
cass_table_meta_clustering_key_order(const CassTableMeta * table_meta,size_t index)274 CassClusteringOrder cass_table_meta_clustering_key_order(const CassTableMeta* table_meta,
275                                                          size_t index) {
276   if (index >= table_meta->clustering_key_order().size()) {
277     return CASS_CLUSTERING_ORDER_NONE;
278   }
279   return table_meta->clustering_key_order()[index];
280 }
281 
cass_table_meta_field_by_name(const CassTableMeta * table_meta,const char * name)282 const CassValue* cass_table_meta_field_by_name(const CassTableMeta* table_meta, const char* name) {
283   return cass_table_meta_field_by_name_n(table_meta, name, SAFE_STRLEN(name));
284 }
285 
cass_table_meta_field_by_name_n(const CassTableMeta * table_meta,const char * name,size_t name_length)286 const CassValue* cass_table_meta_field_by_name_n(const CassTableMeta* table_meta, const char* name,
287                                                  size_t name_length) {
288   return CassValue::to(table_meta->get_field(String(name, name_length)));
289 }
290 
291 const CassColumnMeta*
cass_materialized_view_meta_column_by_name(const CassMaterializedViewMeta * view_meta,const char * column)292 cass_materialized_view_meta_column_by_name(const CassMaterializedViewMeta* view_meta,
293                                            const char* column) {
294   return cass_materialized_view_meta_column_by_name_n(view_meta, column, SAFE_STRLEN(column));
295 }
296 
297 const CassColumnMeta*
cass_materialized_view_meta_column_by_name_n(const CassMaterializedViewMeta * view_meta,const char * column,size_t column_length)298 cass_materialized_view_meta_column_by_name_n(const CassMaterializedViewMeta* view_meta,
299                                              const char* column, size_t column_length) {
300   return CassColumnMeta::to(view_meta->get_column(String(column, column_length)));
301 }
302 
cass_materialized_view_meta_name(const CassMaterializedViewMeta * view_meta,const char ** name,size_t * name_length)303 void cass_materialized_view_meta_name(const CassMaterializedViewMeta* view_meta, const char** name,
304                                       size_t* name_length) {
305   *name = view_meta->name().data();
306   *name_length = view_meta->name().size();
307 }
308 
309 const CassTableMeta*
cass_materialized_view_meta_base_table(const CassMaterializedViewMeta * view_meta)310 cass_materialized_view_meta_base_table(const CassMaterializedViewMeta* view_meta) {
311   if (view_meta == NULL) { // Materialized views may be NULL (Cassandra < v3.0.0)
312     return NULL;
313   }
314   return CassTableMeta::to(view_meta->base_table());
315 }
316 
317 const CassValue*
cass_materialized_view_meta_field_by_name(const CassMaterializedViewMeta * view_meta,const char * name)318 cass_materialized_view_meta_field_by_name(const CassMaterializedViewMeta* view_meta,
319                                           const char* name) {
320   return cass_materialized_view_meta_field_by_name_n(view_meta, name, SAFE_STRLEN(name));
321 }
322 
323 const CassValue*
cass_materialized_view_meta_field_by_name_n(const CassMaterializedViewMeta * view_meta,const char * name,size_t name_length)324 cass_materialized_view_meta_field_by_name_n(const CassMaterializedViewMeta* view_meta,
325                                             const char* name, size_t name_length) {
326   return CassValue::to(view_meta->get_field(String(name, name_length)));
327 }
328 
cass_materialized_view_meta_column_count(const CassMaterializedViewMeta * view_meta)329 size_t cass_materialized_view_meta_column_count(const CassMaterializedViewMeta* view_meta) {
330   return view_meta->columns().size();
331 }
332 
cass_materialized_view_meta_column(const CassMaterializedViewMeta * view_meta,size_t index)333 const CassColumnMeta* cass_materialized_view_meta_column(const CassMaterializedViewMeta* view_meta,
334                                                          size_t index) {
335   if (index >= view_meta->columns().size()) {
336     return NULL;
337   }
338   return CassColumnMeta::to(view_meta->columns()[index].get());
339 }
340 
cass_materialized_view_meta_partition_key_count(const CassMaterializedViewMeta * view_meta)341 size_t cass_materialized_view_meta_partition_key_count(const CassMaterializedViewMeta* view_meta) {
342   return view_meta->partition_key().size();
343 }
344 
345 const CassColumnMeta*
cass_materialized_view_meta_partition_key(const CassMaterializedViewMeta * view_meta,size_t index)346 cass_materialized_view_meta_partition_key(const CassMaterializedViewMeta* view_meta, size_t index) {
347   if (index >= view_meta->partition_key().size()) {
348     return NULL;
349   }
350   return CassColumnMeta::to(view_meta->partition_key()[index].get());
351 }
352 
cass_materialized_view_meta_clustering_key_count(const CassMaterializedViewMeta * view_meta)353 size_t cass_materialized_view_meta_clustering_key_count(const CassMaterializedViewMeta* view_meta) {
354   return view_meta->clustering_key().size();
355 }
356 
357 const CassColumnMeta*
cass_materialized_view_meta_clustering_key(const CassMaterializedViewMeta * view_meta,size_t index)358 cass_materialized_view_meta_clustering_key(const CassMaterializedViewMeta* view_meta,
359                                            size_t index) {
360   if (index >= view_meta->clustering_key().size()) {
361     return NULL;
362   }
363   return CassColumnMeta::to(view_meta->clustering_key()[index].get());
364 }
365 
366 CassClusteringOrder
cass_materialized_view_meta_clustering_key_order(const CassMaterializedViewMeta * view_meta,size_t index)367 cass_materialized_view_meta_clustering_key_order(const CassMaterializedViewMeta* view_meta,
368                                                  size_t index) {
369   if (index >= view_meta->clustering_key_order().size()) {
370     return CASS_CLUSTERING_ORDER_NONE;
371   }
372   return view_meta->clustering_key_order()[index];
373 }
374 
cass_column_meta_name(const CassColumnMeta * column_meta,const char ** name,size_t * name_length)375 void cass_column_meta_name(const CassColumnMeta* column_meta, const char** name,
376                            size_t* name_length) {
377   *name = column_meta->name().data();
378   *name_length = column_meta->name().size();
379 }
380 
cass_column_meta_type(const CassColumnMeta * column_meta)381 CassColumnType cass_column_meta_type(const CassColumnMeta* column_meta) {
382   return column_meta->type();
383 }
384 
cass_column_meta_data_type(const CassColumnMeta * column_meta)385 const CassDataType* cass_column_meta_data_type(const CassColumnMeta* column_meta) {
386   return CassDataType::to(column_meta->data_type().get());
387 }
388 
cass_column_meta_field_by_name(const CassColumnMeta * column_meta,const char * name)389 const CassValue* cass_column_meta_field_by_name(const CassColumnMeta* column_meta,
390                                                 const char* name) {
391   return cass_column_meta_field_by_name_n(column_meta, name, SAFE_STRLEN(name));
392 }
393 
cass_column_meta_field_by_name_n(const CassColumnMeta * column_meta,const char * name,size_t name_length)394 const CassValue* cass_column_meta_field_by_name_n(const CassColumnMeta* column_meta,
395                                                   const char* name, size_t name_length) {
396   return CassValue::to(column_meta->get_field(String(name, name_length)));
397 }
398 
cass_index_meta_name(const CassIndexMeta * index_meta,const char ** name,size_t * name_length)399 void cass_index_meta_name(const CassIndexMeta* index_meta, const char** name, size_t* name_length) {
400   *name = index_meta->name().data();
401   *name_length = index_meta->name().size();
402 }
403 
cass_index_meta_type(const CassIndexMeta * index_meta)404 CassIndexType cass_index_meta_type(const CassIndexMeta* index_meta) { return index_meta->type(); }
405 
cass_index_meta_target(const CassIndexMeta * index_meta,const char ** target,size_t * target_length)406 void cass_index_meta_target(const CassIndexMeta* index_meta, const char** target,
407                             size_t* target_length) {
408   *target = index_meta->target().data();
409   *target_length = index_meta->target().size();
410 }
411 
cass_index_meta_options(const CassIndexMeta * index_meta)412 const CassValue* cass_index_meta_options(const CassIndexMeta* index_meta) {
413   return CassValue::to(index_meta->options());
414 }
415 
cass_index_meta_field_by_name(const CassIndexMeta * index_meta,const char * name)416 const CassValue* cass_index_meta_field_by_name(const CassIndexMeta* index_meta, const char* name) {
417   return cass_index_meta_field_by_name_n(index_meta, name, SAFE_STRLEN(name));
418 }
419 
cass_index_meta_field_by_name_n(const CassIndexMeta * index_meta,const char * name,size_t name_length)420 const CassValue* cass_index_meta_field_by_name_n(const CassIndexMeta* index_meta, const char* name,
421                                                  size_t name_length) {
422   return CassValue::to(index_meta->get_field(String(name, name_length)));
423 }
424 
cass_function_meta_name(const CassFunctionMeta * function_meta,const char ** name,size_t * name_length)425 void cass_function_meta_name(const CassFunctionMeta* function_meta, const char** name,
426                              size_t* name_length) {
427   *name = function_meta->simple_name().data();
428   *name_length = function_meta->simple_name().size();
429 }
430 
cass_function_meta_full_name(const CassFunctionMeta * function_meta,const char ** full_name,size_t * full_name_length)431 void cass_function_meta_full_name(const CassFunctionMeta* function_meta, const char** full_name,
432                                   size_t* full_name_length) {
433   *full_name = function_meta->name().data();
434   *full_name_length = function_meta->name().size();
435 }
436 
cass_function_meta_body(const CassFunctionMeta * function_meta,const char ** body,size_t * body_length)437 void cass_function_meta_body(const CassFunctionMeta* function_meta, const char** body,
438                              size_t* body_length) {
439   *body = function_meta->body().data();
440   *body_length = function_meta->body().size();
441 }
442 
cass_function_meta_language(const CassFunctionMeta * function_meta,const char ** language,size_t * language_length)443 void cass_function_meta_language(const CassFunctionMeta* function_meta, const char** language,
444                                  size_t* language_length) {
445   *language = function_meta->language().data();
446   *language_length = function_meta->language().size();
447 }
448 
cass_function_meta_called_on_null_input(const CassFunctionMeta * function_meta)449 cass_bool_t cass_function_meta_called_on_null_input(const CassFunctionMeta* function_meta) {
450   return function_meta->called_on_null_input() ? cass_true : cass_false;
451 }
452 
cass_function_meta_argument_count(const CassFunctionMeta * function_meta)453 size_t cass_function_meta_argument_count(const CassFunctionMeta* function_meta) {
454   return function_meta->args().size();
455 }
456 
cass_function_meta_argument(const CassFunctionMeta * function_meta,size_t index,const char ** name,size_t * name_length,const CassDataType ** type)457 CassError cass_function_meta_argument(const CassFunctionMeta* function_meta, size_t index,
458                                       const char** name, size_t* name_length,
459                                       const CassDataType** type) {
460   if (index >= function_meta->args().size()) {
461     return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
462   }
463   const FunctionMetadata::Argument& arg = function_meta->args()[index];
464   *name = arg.name.data();
465   *name_length = arg.name.size();
466   *type = CassDataType::to(arg.type.get());
467   return CASS_OK;
468 }
469 
cass_function_meta_argument_type_by_name(const CassFunctionMeta * function_meta,const char * name)470 const CassDataType* cass_function_meta_argument_type_by_name(const CassFunctionMeta* function_meta,
471                                                              const char* name) {
472   return cass_function_meta_argument_type_by_name_n(function_meta, name, SAFE_STRLEN(name));
473 }
474 
475 const CassDataType*
cass_function_meta_argument_type_by_name_n(const CassFunctionMeta * function_meta,const char * name,size_t name_length)476 cass_function_meta_argument_type_by_name_n(const CassFunctionMeta* function_meta, const char* name,
477                                            size_t name_length) {
478   return CassDataType::to(function_meta->get_arg_type(StringRef(name, name_length)));
479 }
480 
cass_function_meta_return_type(const CassFunctionMeta * function_meta)481 const CassDataType* cass_function_meta_return_type(const CassFunctionMeta* function_meta) {
482   return CassDataType::to(function_meta->return_type().get());
483 }
484 
cass_function_meta_field_by_name(const CassFunctionMeta * function_meta,const char * name)485 const CassValue* cass_function_meta_field_by_name(const CassFunctionMeta* function_meta,
486                                                   const char* name) {
487   return cass_function_meta_field_by_name_n(function_meta, name, SAFE_STRLEN(name));
488 }
489 
cass_function_meta_field_by_name_n(const CassFunctionMeta * function_meta,const char * name,size_t name_length)490 const CassValue* cass_function_meta_field_by_name_n(const CassFunctionMeta* function_meta,
491                                                     const char* name, size_t name_length) {
492   return CassValue::to(function_meta->get_field(String(name, name_length)));
493 }
494 
cass_aggregate_meta_name(const CassAggregateMeta * aggregate_meta,const char ** name,size_t * name_length)495 void cass_aggregate_meta_name(const CassAggregateMeta* aggregate_meta, const char** name,
496                               size_t* name_length) {
497   *name = aggregate_meta->simple_name().data();
498   *name_length = aggregate_meta->simple_name().size();
499 }
500 
cass_aggregate_meta_full_name(const CassAggregateMeta * aggregate_meta,const char ** full_name,size_t * full_name_length)501 void cass_aggregate_meta_full_name(const CassAggregateMeta* aggregate_meta, const char** full_name,
502                                    size_t* full_name_length) {
503   *full_name = aggregate_meta->name().data();
504   *full_name_length = aggregate_meta->name().size();
505 }
506 
cass_aggregate_meta_argument_count(const CassAggregateMeta * aggregate_meta)507 size_t cass_aggregate_meta_argument_count(const CassAggregateMeta* aggregate_meta) {
508   return aggregate_meta->arg_types().size();
509 }
510 
cass_aggregate_meta_argument_type(const CassAggregateMeta * aggregate_meta,size_t index)511 const CassDataType* cass_aggregate_meta_argument_type(const CassAggregateMeta* aggregate_meta,
512                                                       size_t index) {
513   if (index >= aggregate_meta->arg_types().size()) {
514     return NULL;
515   }
516   return CassDataType::to(aggregate_meta->arg_types()[index].get());
517 }
518 
cass_aggregate_meta_return_type(const CassAggregateMeta * aggregate_meta)519 const CassDataType* cass_aggregate_meta_return_type(const CassAggregateMeta* aggregate_meta) {
520   return CassDataType::to(aggregate_meta->return_type().get());
521 }
522 
cass_aggregate_meta_state_type(const CassAggregateMeta * aggregate_meta)523 const CassDataType* cass_aggregate_meta_state_type(const CassAggregateMeta* aggregate_meta) {
524   return CassDataType::to(aggregate_meta->state_type().get());
525 }
526 
cass_aggregate_meta_state_func(const CassAggregateMeta * aggregate_meta)527 const CassFunctionMeta* cass_aggregate_meta_state_func(const CassAggregateMeta* aggregate_meta) {
528   return CassFunctionMeta::to(aggregate_meta->state_func().get());
529 }
530 
cass_aggregate_meta_final_func(const CassAggregateMeta * aggregate_meta)531 const CassFunctionMeta* cass_aggregate_meta_final_func(const CassAggregateMeta* aggregate_meta) {
532   return CassFunctionMeta::to(aggregate_meta->final_func().get());
533 }
534 
cass_aggregate_meta_init_cond(const CassAggregateMeta * aggregate_meta)535 const CassValue* cass_aggregate_meta_init_cond(const CassAggregateMeta* aggregate_meta) {
536   return CassValue::to(&aggregate_meta->init_cond());
537 }
538 
cass_aggregate_meta_field_by_name(const CassAggregateMeta * aggregate_meta,const char * name)539 const CassValue* cass_aggregate_meta_field_by_name(const CassAggregateMeta* aggregate_meta,
540                                                    const char* name) {
541   return cass_aggregate_meta_field_by_name_n(aggregate_meta, name, SAFE_STRLEN(name));
542 }
543 
cass_aggregate_meta_field_by_name_n(const CassAggregateMeta * aggregate_meta,const char * name,size_t name_length)544 const CassValue* cass_aggregate_meta_field_by_name_n(const CassAggregateMeta* aggregate_meta,
545                                                      const char* name, size_t name_length) {
546   return CassValue::to(aggregate_meta->get_field(String(name, name_length)));
547 }
548 
cass_iterator_keyspaces_from_schema_meta(const CassSchemaMeta * schema_meta)549 CassIterator* cass_iterator_keyspaces_from_schema_meta(const CassSchemaMeta* schema_meta) {
550   return CassIterator::to(schema_meta->iterator_keyspaces());
551 }
552 
cass_iterator_tables_from_keyspace_meta(const CassKeyspaceMeta * keyspace_meta)553 CassIterator* cass_iterator_tables_from_keyspace_meta(const CassKeyspaceMeta* keyspace_meta) {
554   return CassIterator::to(keyspace_meta->iterator_tables());
555 }
556 
557 CassIterator*
cass_iterator_materialized_views_from_keyspace_meta(const CassKeyspaceMeta * keyspace_meta)558 cass_iterator_materialized_views_from_keyspace_meta(const CassKeyspaceMeta* keyspace_meta) {
559   return CassIterator::to(keyspace_meta->iterator_views());
560 }
561 
cass_iterator_user_types_from_keyspace_meta(const CassKeyspaceMeta * keyspace_meta)562 CassIterator* cass_iterator_user_types_from_keyspace_meta(const CassKeyspaceMeta* keyspace_meta) {
563   return CassIterator::to(keyspace_meta->iterator_user_types());
564 }
565 
cass_iterator_functions_from_keyspace_meta(const CassKeyspaceMeta * keyspace_meta)566 CassIterator* cass_iterator_functions_from_keyspace_meta(const CassKeyspaceMeta* keyspace_meta) {
567   return CassIterator::to(keyspace_meta->iterator_functions());
568 }
569 
cass_iterator_aggregates_from_keyspace_meta(const CassKeyspaceMeta * keyspace_meta)570 CassIterator* cass_iterator_aggregates_from_keyspace_meta(const CassKeyspaceMeta* keyspace_meta) {
571   return CassIterator::to(keyspace_meta->iterator_aggregates());
572 }
573 
cass_iterator_fields_from_keyspace_meta(const CassKeyspaceMeta * keyspace_meta)574 CassIterator* cass_iterator_fields_from_keyspace_meta(const CassKeyspaceMeta* keyspace_meta) {
575   return CassIterator::to(keyspace_meta->iterator_fields());
576 }
577 
cass_iterator_columns_from_table_meta(const CassTableMeta * table_meta)578 CassIterator* cass_iterator_columns_from_table_meta(const CassTableMeta* table_meta) {
579   return CassIterator::to(table_meta->iterator_columns());
580 }
581 
cass_iterator_materialized_views_from_table_meta(const CassTableMeta * table_meta)582 CassIterator* cass_iterator_materialized_views_from_table_meta(const CassTableMeta* table_meta) {
583   return CassIterator::to(table_meta->iterator_views());
584 }
585 
cass_iterator_indexes_from_table_meta(const CassTableMeta * table_meta)586 CassIterator* cass_iterator_indexes_from_table_meta(const CassTableMeta* table_meta) {
587   return CassIterator::to(table_meta->iterator_indexes());
588 }
589 
cass_iterator_fields_from_table_meta(const CassTableMeta * table_meta)590 CassIterator* cass_iterator_fields_from_table_meta(const CassTableMeta* table_meta) {
591   return CassIterator::to(table_meta->iterator_fields());
592 }
593 
594 CassIterator*
cass_iterator_columns_from_materialized_view_meta(const CassMaterializedViewMeta * view_meta)595 cass_iterator_columns_from_materialized_view_meta(const CassMaterializedViewMeta* view_meta) {
596   return CassIterator::to(view_meta->iterator_columns());
597 }
598 
599 CassIterator*
cass_iterator_fields_from_materialized_view_meta(const CassMaterializedViewMeta * view_meta)600 cass_iterator_fields_from_materialized_view_meta(const CassMaterializedViewMeta* view_meta) {
601   return CassIterator::to(view_meta->iterator_fields());
602 }
603 
cass_iterator_fields_from_column_meta(const CassColumnMeta * column_meta)604 CassIterator* cass_iterator_fields_from_column_meta(const CassColumnMeta* column_meta) {
605   return CassIterator::to(column_meta->iterator_fields());
606 }
607 
cass_iterator_fields_from_index_meta(const CassIndexMeta * index_meta)608 CassIterator* cass_iterator_fields_from_index_meta(const CassIndexMeta* index_meta) {
609   return CassIterator::to(index_meta->iterator_fields());
610 }
611 
cass_iterator_fields_from_function_meta(const CassFunctionMeta * function_meta)612 CassIterator* cass_iterator_fields_from_function_meta(const CassFunctionMeta* function_meta) {
613   return CassIterator::to(function_meta->iterator_fields());
614 }
615 
cass_iterator_fields_from_aggregate_meta(const CassAggregateMeta * aggregate_meta)616 CassIterator* cass_iterator_fields_from_aggregate_meta(const CassAggregateMeta* aggregate_meta) {
617   return CassIterator::to(aggregate_meta->iterator_fields());
618 }
619 
cass_iterator_get_keyspace_meta(const CassIterator * iterator)620 const CassKeyspaceMeta* cass_iterator_get_keyspace_meta(const CassIterator* iterator) {
621   if (iterator->type() != CASS_ITERATOR_TYPE_KEYSPACE_META) {
622     return NULL;
623   }
624   return CassKeyspaceMeta::to(
625       static_cast<const Metadata::KeyspaceIterator*>(iterator->from())->keyspace());
626 }
627 
cass_iterator_get_table_meta(const CassIterator * iterator)628 const CassTableMeta* cass_iterator_get_table_meta(const CassIterator* iterator) {
629   if (iterator->type() != CASS_ITERATOR_TYPE_TABLE_META) {
630     return NULL;
631   }
632   return CassTableMeta::to(
633       static_cast<const KeyspaceMetadata::TableIterator*>(iterator->from())->table());
634 }
635 
636 const CassMaterializedViewMeta*
cass_iterator_get_materialized_view_meta(const CassIterator * iterator)637 cass_iterator_get_materialized_view_meta(const CassIterator* iterator) {
638   if (iterator->type() != CASS_ITERATOR_TYPE_MATERIALIZED_VIEW_META) {
639     return NULL;
640   }
641   return CassMaterializedViewMeta::to(
642       static_cast<const ViewIteratorBase*>(iterator->from())->view());
643 }
644 
cass_iterator_get_user_type(const CassIterator * iterator)645 const CassDataType* cass_iterator_get_user_type(const CassIterator* iterator) {
646   if (iterator->type() != CASS_ITERATOR_TYPE_TYPE_META) {
647     return NULL;
648   }
649   return CassDataType::to(
650       static_cast<const KeyspaceMetadata::TypeIterator*>(iterator->from())->type());
651 }
652 
cass_iterator_get_function_meta(const CassIterator * iterator)653 const CassFunctionMeta* cass_iterator_get_function_meta(const CassIterator* iterator) {
654   if (iterator->type() != CASS_ITERATOR_TYPE_FUNCTION_META) {
655     return NULL;
656   }
657   return CassFunctionMeta::to(
658       static_cast<const KeyspaceMetadata::FunctionIterator*>(iterator->from())->function());
659 }
660 
cass_iterator_get_aggregate_meta(const CassIterator * iterator)661 const CassAggregateMeta* cass_iterator_get_aggregate_meta(const CassIterator* iterator) {
662   if (iterator->type() != CASS_ITERATOR_TYPE_AGGREGATE_META) {
663     return NULL;
664   }
665   return CassAggregateMeta::to(
666       static_cast<const KeyspaceMetadata::AggregateIterator*>(iterator->from())->aggregate());
667 }
668 
cass_iterator_get_column_meta(const CassIterator * iterator)669 const CassColumnMeta* cass_iterator_get_column_meta(const CassIterator* iterator) {
670   if (iterator->type() != CASS_ITERATOR_TYPE_COLUMN_META) {
671     return NULL;
672   }
673   return CassColumnMeta::to(
674       static_cast<const TableMetadata::ColumnIterator*>(iterator->from())->column());
675 }
676 
cass_iterator_get_index_meta(const CassIterator * iterator)677 const CassIndexMeta* cass_iterator_get_index_meta(const CassIterator* iterator) {
678   if (iterator->type() != CASS_ITERATOR_TYPE_INDEX_META) {
679     return NULL;
680   }
681   return CassIndexMeta::to(
682       static_cast<const TableMetadata::IndexIterator*>(iterator->from())->index());
683 }
684 
cass_iterator_get_meta_field_name(const CassIterator * iterator,const char ** name,size_t * name_length)685 CassError cass_iterator_get_meta_field_name(const CassIterator* iterator, const char** name,
686                                             size_t* name_length) {
687   if (iterator->type() != CASS_ITERATOR_TYPE_META_FIELD) {
688     return CASS_ERROR_LIB_BAD_PARAMS;
689   }
690   const MetadataField* field = static_cast<const MetadataFieldIterator*>(iterator->from())->field();
691   *name = field->name().data();
692   *name_length = field->name().size();
693   return CASS_OK;
694 }
695 
cass_iterator_get_meta_field_value(const CassIterator * iterator)696 const CassValue* cass_iterator_get_meta_field_value(const CassIterator* iterator) {
697   if (iterator->type() != CASS_ITERATOR_TYPE_META_FIELD) {
698     return NULL;
699   }
700   return CassValue::to(
701       static_cast<const MetadataFieldIterator*>(iterator->from())->field()->value());
702 }
703 
704 } // extern "C"
705 
table_column_name(const VersionNumber & server_version)706 static const char* table_column_name(const VersionNumber& server_version) {
707   return server_version >= VersionNumber(3, 0, 0) ? "table_name" : "columnfamily_name";
708 }
709 
signature_column_name(const VersionNumber & server_version)710 static const char* signature_column_name(const VersionNumber& server_version) {
711   return server_version >= VersionNumber(3, 0, 0) ? "argument_types" : "signature";
712 }
713 
714 template <class T>
as_const(const T & x)715 const T& as_const(const T& x) {
716   return x;
717 }
718 
get_keyspace(const String & name) const719 const KeyspaceMetadata* Metadata::SchemaSnapshot::get_keyspace(const String& name) const {
720   KeyspaceMetadata::Map::const_iterator i = keyspaces_->find(name);
721   if (i == keyspaces_->end()) return NULL;
722   return &i->second;
723 }
724 
get_user_type(const String & keyspace_name,const String & type_name) const725 const UserType* Metadata::SchemaSnapshot::get_user_type(const String& keyspace_name,
726                                                         const String& type_name) const {
727   KeyspaceMetadata::Map::const_iterator i = keyspaces_->find(keyspace_name);
728   if (i == keyspaces_->end()) {
729     return NULL;
730   }
731   return i->second.get_user_type(type_name);
732 }
733 
full_function_name(const String & name,const StringVec & signature)734 String Metadata::full_function_name(const String& name, const StringVec& signature) {
735   String full_function_name(name);
736   full_function_name.push_back('(');
737   for (StringVec::const_iterator i = signature.begin(), end = signature.end(); i != end; ++i) {
738     String argument(*i);
739     // Remove white-space
740     argument.erase(std::remove_if(argument.begin(), argument.end(), ::isspace), argument.end());
741     if (!argument.empty()) {
742       if (i != signature.begin()) full_function_name.push_back(',');
743       full_function_name.append(argument);
744     }
745   }
746   full_function_name.push_back(')');
747   return full_function_name;
748 }
749 
schema_snapshot() const750 Metadata::SchemaSnapshot Metadata::schema_snapshot() const {
751   ScopedMutex l(&mutex_);
752   return SchemaSnapshot(schema_snapshot_version_, server_version_, front_.keyspaces());
753 }
754 
update_keyspaces(const ResultResponse * result,bool is_virtual)755 void Metadata::update_keyspaces(const ResultResponse* result, bool is_virtual) {
756   schema_snapshot_version_++;
757 
758   if (is_front_buffer()) {
759     ScopedMutex l(&mutex_);
760     updating_->update_keyspaces(server_version_, result, is_virtual);
761   } else {
762     updating_->update_keyspaces(server_version_, result, is_virtual);
763   }
764 }
765 
update_tables(const ResultResponse * result)766 void Metadata::update_tables(const ResultResponse* result) {
767   schema_snapshot_version_++;
768 
769   if (is_front_buffer()) {
770     ScopedMutex l(&mutex_);
771     updating_->update_tables(server_version_, result);
772   } else {
773     updating_->update_tables(server_version_, result);
774   }
775 }
776 
update_views(const ResultResponse * result)777 void Metadata::update_views(const ResultResponse* result) {
778   schema_snapshot_version_++;
779 
780   if (is_front_buffer()) {
781     ScopedMutex l(&mutex_);
782     updating_->update_views(server_version_, result);
783   } else {
784     updating_->update_views(server_version_, result);
785   }
786 }
787 
update_columns(const ResultResponse * result)788 void Metadata::update_columns(const ResultResponse* result) {
789   schema_snapshot_version_++;
790 
791   if (is_front_buffer()) {
792     ScopedMutex l(&mutex_);
793     updating_->update_columns(server_version_, cache_, result);
794     if (server_version_ < VersionNumber(3, 0, 0)) {
795       updating_->update_legacy_indexes(server_version_, result);
796     }
797   } else {
798     updating_->update_columns(server_version_, cache_, result);
799     if (server_version_ < VersionNumber(3, 0, 0)) {
800       updating_->update_legacy_indexes(server_version_, result);
801     }
802   }
803 }
804 
update_indexes(const ResultResponse * result)805 void Metadata::update_indexes(const ResultResponse* result) {
806   schema_snapshot_version_++;
807 
808   if (is_front_buffer()) {
809     ScopedMutex l(&mutex_);
810     updating_->update_indexes(server_version_, result);
811   } else {
812     updating_->update_indexes(server_version_, result);
813   }
814 }
815 
update_user_types(const ResultResponse * result)816 void Metadata::update_user_types(const ResultResponse* result) {
817   schema_snapshot_version_++;
818 
819   if (is_front_buffer()) {
820     ScopedMutex l(&mutex_);
821     updating_->update_user_types(server_version_, cache_, result);
822   } else {
823     updating_->update_user_types(server_version_, cache_, result);
824   }
825 }
826 
update_functions(const ResultResponse * result)827 void Metadata::update_functions(const ResultResponse* result) {
828   schema_snapshot_version_++;
829 
830   if (is_front_buffer()) {
831     ScopedMutex l(&mutex_);
832     updating_->update_functions(server_version_, cache_, result);
833   } else {
834     updating_->update_functions(server_version_, cache_, result);
835   }
836 }
837 
update_aggregates(const ResultResponse * result)838 void Metadata::update_aggregates(const ResultResponse* result) {
839   schema_snapshot_version_++;
840 
841   if (is_front_buffer()) {
842     ScopedMutex l(&mutex_);
843     updating_->update_aggregates(server_version_, cache_, result);
844   } else {
845     updating_->update_aggregates(server_version_, cache_, result);
846   }
847 }
848 
drop_keyspace(const String & keyspace_name)849 void Metadata::drop_keyspace(const String& keyspace_name) {
850   schema_snapshot_version_++;
851 
852   if (is_front_buffer()) {
853     ScopedMutex l(&mutex_);
854     updating_->drop_keyspace(keyspace_name);
855   } else {
856     updating_->drop_keyspace(keyspace_name);
857   }
858 }
859 
drop_table_or_view(const String & keyspace_name,const String & table_or_view_name)860 void Metadata::drop_table_or_view(const String& keyspace_name, const String& table_or_view_name) {
861   schema_snapshot_version_++;
862 
863   if (is_front_buffer()) {
864     ScopedMutex l(&mutex_);
865     updating_->drop_table_or_view(keyspace_name, table_or_view_name);
866   } else {
867     updating_->drop_table_or_view(keyspace_name, table_or_view_name);
868   }
869 }
870 
drop_user_type(const String & keyspace_name,const String & type_name)871 void Metadata::drop_user_type(const String& keyspace_name, const String& type_name) {
872   schema_snapshot_version_++;
873 
874   if (is_front_buffer()) {
875     ScopedMutex l(&mutex_);
876     updating_->drop_user_type(keyspace_name, type_name);
877   } else {
878     updating_->drop_user_type(keyspace_name, type_name);
879   }
880 }
881 
drop_function(const String & keyspace_name,const String & full_function_name)882 void Metadata::drop_function(const String& keyspace_name, const String& full_function_name) {
883   schema_snapshot_version_++;
884 
885   if (is_front_buffer()) {
886     ScopedMutex l(&mutex_);
887     updating_->drop_function(keyspace_name, full_function_name);
888   } else {
889     updating_->drop_function(keyspace_name, full_function_name);
890   }
891 }
892 
drop_aggregate(const String & keyspace_name,const String & full_aggregate_name)893 void Metadata::drop_aggregate(const String& keyspace_name, const String& full_aggregate_name) {
894   schema_snapshot_version_++;
895 
896   if (is_front_buffer()) {
897     ScopedMutex l(&mutex_);
898     updating_->drop_aggregate(keyspace_name, full_aggregate_name);
899   } else {
900     updating_->drop_aggregate(keyspace_name, full_aggregate_name);
901   }
902 }
903 
clear_and_update_back(const VersionNumber & server_version)904 void Metadata::clear_and_update_back(const VersionNumber& server_version) {
905   {
906     ScopedMutex l(&mutex_);
907     server_version_ = server_version;
908   }
909   back_.clear();
910   updating_ = &back_;
911 }
912 
swap_to_back_and_update_front()913 void Metadata::swap_to_back_and_update_front() {
914   {
915     ScopedMutex l(&mutex_);
916     schema_snapshot_version_++;
917     front_.swap(back_);
918   }
919   back_.clear();
920   updating_ = &front_;
921 }
922 
clear()923 void Metadata::clear() {
924   {
925     ScopedMutex l(&mutex_);
926     schema_snapshot_version_ = 0;
927     front_.clear();
928   }
929   back_.clear();
930 }
931 
get_field(const String & name) const932 const Value* MetadataBase::get_field(const String& name) const {
933   MetadataField::Map::const_iterator it = fields_.find(name);
934   if (it == fields_.end()) return NULL;
935   return it->second.value();
936 }
937 
get_string_field(const String & name) const938 String MetadataBase::get_string_field(const String& name) const {
939   const Value* value = get_field(name);
940   if (value == NULL) return String();
941   return value->to_string();
942 }
943 
add_field(const RefBuffer::Ptr & buffer,const Row * row,const String & name)944 const Value* MetadataBase::add_field(const RefBuffer::Ptr& buffer, const Row* row,
945                                      const String& name) {
946   const Value* value = row->get_by_name(name);
947   if (value == NULL) return NULL;
948   if (value->is_null()) {
949     fields_[name] = MetadataField(name);
950     return NULL; // Return NULL for "null" columns
951   } else {
952     fields_[name] = MetadataField(name, *value, buffer);
953     return value;
954   }
955 }
956 
add_field(const RefBuffer::Ptr & buffer,const Value & value,const String & name)957 void MetadataBase::add_field(const RefBuffer::Ptr& buffer, const Value& value, const String& name) {
958   fields_[name] = MetadataField(name, value, buffer);
959 }
960 
add_json_list_field(const Row * row,const String & name)961 void MetadataBase::add_json_list_field(const Row* row, const String& name) {
962   const Value* value = row->get_by_name(name);
963   if (value == NULL) return;
964   if (value->is_null()) {
965     fields_[name] = MetadataField(name);
966     return;
967   }
968 
969   Vector<char> buf = value->decoder().as_vector();
970   json::Document d;
971   d.ParseInsitu(&buf[0]);
972 
973   if (d.HasParseError()) {
974     LOG_ERROR("Unable to parse JSON (array) for column '%s'", name.c_str());
975     return;
976   }
977 
978   if (!d.IsArray()) {
979     LOG_DEBUG("Expected JSON array for column '%s' (probably null or empty)", name.c_str());
980     fields_[name] = MetadataField(name);
981     return;
982   }
983 
984   Collection collection(
985       CollectionType::list(DataType::Ptr(new DataType(CASS_VALUE_TYPE_TEXT)), false), d.Size());
986   for (json::Value::ConstValueIterator i = d.Begin(); i != d.End(); ++i) {
987     collection.append(CassString(i->GetString(), i->GetStringLength()));
988   }
989 
990   size_t encoded_size = collection.get_items_size();
991   RefBuffer::Ptr encoded(RefBuffer::create(encoded_size));
992 
993   collection.encode_items(encoded->data());
994 
995   Value list(collection.data_type(), d.Size(),
996              Decoder(encoded->data(), encoded_size, value->protocol_version()));
997   fields_[name] = MetadataField(name, list, encoded);
998 }
999 
add_json_map_field(const Row * row,const String & name)1000 const Value* MetadataBase::add_json_map_field(const Row* row, const String& name) {
1001   const Value* value = row->get_by_name(name);
1002   if (value == NULL) return NULL;
1003   if (value->is_null()) {
1004     return (fields_[name] = MetadataField(name)).value();
1005   }
1006 
1007   Vector<char> buf = value->decoder().as_vector();
1008   json::Document d;
1009   d.ParseInsitu(&buf[0]);
1010 
1011   if (d.HasParseError()) {
1012     LOG_ERROR("Unable to parse JSON (object) for column '%s'", name.c_str());
1013     return (fields_[name] = MetadataField(name)).value();
1014   }
1015 
1016   if (!d.IsObject()) {
1017     LOG_DEBUG("Expected JSON object for column '%s' (probably null or empty)", name.c_str());
1018     fields_[name] = MetadataField(name);
1019     return (fields_[name] = MetadataField(name)).value();
1020   }
1021 
1022   Collection collection(CollectionType::map(DataType::Ptr(new DataType(CASS_VALUE_TYPE_TEXT)),
1023                                             DataType::Ptr(new DataType(CASS_VALUE_TYPE_TEXT)),
1024                                             false),
1025                         2 * d.MemberCount());
1026   for (json::Value::ConstMemberIterator i = d.MemberBegin(); i != d.MemberEnd(); ++i) {
1027     collection.append(CassString(i->name.GetString(), i->name.GetStringLength()));
1028     collection.append(CassString(i->value.GetString(), i->value.GetStringLength()));
1029   }
1030 
1031   size_t encoded_size = collection.get_items_size();
1032   RefBuffer::Ptr encoded(RefBuffer::create(encoded_size));
1033 
1034   collection.encode_items(encoded->data());
1035 
1036   Value map(collection.data_type(), d.MemberCount(),
1037             Decoder(encoded->data(), encoded_size, value->protocol_version()));
1038 
1039   return (fields_[name] = MetadataField(name, map, encoded)).value();
1040 }
1041 
get_table(const String & name) const1042 const TableMetadata* KeyspaceMetadata::get_table(const String& name) const {
1043   TableMetadata::Map::const_iterator i = tables_->find(name);
1044   if (i == tables_->end()) return NULL;
1045   return i->second.get();
1046 }
1047 
get_table(const String & name)1048 const TableMetadata::Ptr& KeyspaceMetadata::get_table(const String& name) {
1049   TableMetadata::Map::iterator i = tables_->find(name);
1050   if (i == tables_->end()) return TableMetadata::NIL;
1051   return i->second;
1052 }
1053 
add_table(const TableMetadata::Ptr & table)1054 void KeyspaceMetadata::add_table(const TableMetadata::Ptr& table) {
1055   TableMetadata::Map::iterator table_it = tables_->find(table->name());
1056 
1057   // If there's a previous version of this table then copy its views
1058   // to the new version of the table, and update the table back-refs
1059   // in the views.
1060   if (table_it != tables_->end()) {
1061     TableMetadata::Ptr old_table(table_it->second);
1062     internal_add_table(table, old_table->views());
1063   } else {
1064     (*tables_)[table->name()] = table; // Add new table
1065   }
1066 }
1067 
internal_add_table(const TableMetadata::Ptr & table,const ViewMetadata::Vec & views)1068 void KeyspaceMetadata::internal_add_table(const TableMetadata::Ptr& table,
1069                                           const ViewMetadata::Vec& views) {
1070   // Copy all the views and update the table and keyspace views
1071   for (ViewMetadata::Vec::const_iterator i = views.begin(); i != views.end(); ++i) {
1072     ViewMetadata::Ptr view(new ViewMetadata(**i, table.get()));
1073     table->add_view(view);
1074     (*views_)[view->name()] = view;
1075   }
1076   (*tables_)[table->name()] = table;
1077 }
1078 
get_view(const String & name) const1079 const ViewMetadata* KeyspaceMetadata::get_view(const String& name) const {
1080   ViewMetadata::Map::const_iterator i = views_->find(name);
1081   if (i == views_->end()) return NULL;
1082   return i->second.get();
1083 }
1084 
get_view(const String & name)1085 const ViewMetadata::Ptr& KeyspaceMetadata::get_view(const String& name) {
1086   ViewMetadata::Map::iterator i = views_->find(name);
1087   if (i == views_->end()) return ViewMetadata::NIL;
1088   return i->second;
1089 }
1090 
add_view(const ViewMetadata::Ptr & view)1091 void KeyspaceMetadata::add_view(const ViewMetadata::Ptr& view) { (*views_)[view->name()] = view; }
1092 
drop_table_or_view(const String & table_or_view_name)1093 void KeyspaceMetadata::drop_table_or_view(const String& table_or_view_name) {
1094   TableMetadata::Map::iterator table_it = tables_->find(table_or_view_name);
1095   if (table_it != tables_->end()) { // The name is for a table, remove the
1096     // table and views from keyspace
1097     TableMetadata::Ptr table(table_it->second);
1098     // Cassandra doesn't allow for tables to be dropped while it has active
1099     // views, but it could be possible for the drop events to arrive out of
1100     // order.
1101     for (ViewMetadata::Vec::const_iterator i = table->views().begin(), end = table->views().end();
1102          i != end; ++i) {
1103       views_->erase((*i)->name());
1104     }
1105     tables_->erase(table_it);
1106   } else { // The name is for a view, remove the view from the table and keyspace
1107     ViewMetadata::Map::iterator view_it = views_->find(table_or_view_name);
1108     if (view_it != views_->end()) {
1109       ViewMetadata::Ptr view(view_it->second);
1110 
1111       // Remove view from the base table's views
1112       ViewMetadata::Vec views(view->base_table()->views());
1113       ViewMetadata::Vec::iterator i =
1114           std::lower_bound(views.begin(), views.end(), table_or_view_name);
1115       if (i != views.end() && (*i)->name() == table_or_view_name) {
1116         views.erase(i);
1117       }
1118 
1119       // Create and add a new copy of the base table
1120       TableMetadata::Ptr table(new TableMetadata(*view->base_table()));
1121       internal_add_table(table, views);
1122 
1123       // Remove the dropped view
1124       views_->erase(view_it);
1125     }
1126   }
1127 }
1128 
get_or_create_user_type(const String & name,bool is_frozen)1129 const UserType::Ptr& KeyspaceMetadata::get_or_create_user_type(const String& name, bool is_frozen) {
1130   UserType::Map::iterator i = user_types_->find(name);
1131   if (i == user_types_->end()) {
1132     i = user_types_
1133             ->insert(std::make_pair(
1134                 name, UserType::Ptr(new UserType(MetadataBase::name(), name, is_frozen))))
1135             .first;
1136   }
1137   return i->second;
1138 }
1139 
get_user_type(const String & name) const1140 const UserType* KeyspaceMetadata::get_user_type(const String& name) const {
1141   UserType::Map::const_iterator i = user_types_->find(name);
1142   if (i == user_types_->end()) return NULL;
1143   return i->second.get();
1144 }
1145 
update(const VersionNumber & server_version,const RefBuffer::Ptr & buffer,const Row * row)1146 void KeyspaceMetadata::update(const VersionNumber& server_version, const RefBuffer::Ptr& buffer,
1147                               const Row* row) {
1148   add_field(buffer, row, "keyspace_name");
1149   add_field(buffer, row, "durable_writes");
1150   if (server_version >= VersionNumber(3, 0, 0)) {
1151     const Value* map = add_field(buffer, row, "replication");
1152     if (map != NULL && map->value_type() == CASS_VALUE_TYPE_MAP &&
1153         is_string_type(map->primary_value_type()) && is_string_type(map->secondary_value_type())) {
1154       MapIterator iterator(map);
1155       while (iterator.next()) {
1156         const Value* key = iterator.key();
1157         const Value* value = iterator.value();
1158         if (key->to_string_ref() == "class") {
1159           strategy_class_ = value->to_string_ref();
1160         }
1161       }
1162       strategy_options_ = *map;
1163     }
1164   } else {
1165     const Value* value = add_field(buffer, row, "strategy_class");
1166     if (value != NULL && is_string_type(value->value_type())) {
1167       strategy_class_ = value->to_string_ref();
1168     }
1169     const Value* map = add_json_map_field(row, "strategy_options");
1170     if (map != NULL) {
1171       strategy_options_ = *map;
1172     }
1173   }
1174 }
1175 
drop_user_type(const String & type_name)1176 void KeyspaceMetadata::drop_user_type(const String& type_name) { user_types_->erase(type_name); }
1177 
add_function(const FunctionMetadata::Ptr & function)1178 void KeyspaceMetadata::add_function(const FunctionMetadata::Ptr& function) {
1179   (*functions_)[function->name()] = function;
1180 }
1181 
get_function(const String & full_function_name) const1182 const FunctionMetadata* KeyspaceMetadata::get_function(const String& full_function_name) const {
1183   FunctionMetadata::Map::const_iterator i = functions_->find(full_function_name);
1184   if (i == functions_->end()) return NULL;
1185   return i->second.get();
1186 }
1187 
drop_function(const String & full_function_name)1188 void KeyspaceMetadata::drop_function(const String& full_function_name) {
1189   functions_->erase(full_function_name);
1190 }
1191 
get_aggregate(const String & full_aggregate_name) const1192 const AggregateMetadata* KeyspaceMetadata::get_aggregate(const String& full_aggregate_name) const {
1193   AggregateMetadata::Map::const_iterator i = aggregates_->find(full_aggregate_name);
1194   if (i == aggregates_->end()) return NULL;
1195   return i->second.get();
1196 }
1197 
add_aggregate(const AggregateMetadata::Ptr & aggregate)1198 void KeyspaceMetadata::add_aggregate(const AggregateMetadata::Ptr& aggregate) {
1199   (*aggregates_)[aggregate->name()] = aggregate;
1200 }
1201 
drop_aggregate(const String & full_aggregate_name)1202 void KeyspaceMetadata::drop_aggregate(const String& full_aggregate_name) {
1203   aggregates_->erase(full_aggregate_name);
1204 }
1205 
TableMetadataBase(const VersionNumber & server_version,const String & name,const RefBuffer::Ptr & buffer,const Row * row,bool is_virtual)1206 TableMetadataBase::TableMetadataBase(const VersionNumber& server_version, const String& name,
1207                                      const RefBuffer::Ptr& buffer, const Row* row, bool is_virtual)
1208     : MetadataBase(name)
1209     , is_virtual_(is_virtual) {
1210   add_field(buffer, row, "keyspace_name");
1211   add_field(buffer, row, "bloom_filter_fp_chance");
1212   add_field(buffer, row, "caching");
1213   add_field(buffer, row, "comment");
1214   add_field(buffer, row, "default_time_to_live");
1215   add_field(buffer, row, "gc_grace_seconds");
1216   add_field(buffer, row, "id");
1217   add_field(buffer, row, "speculative_retry");
1218   add_field(buffer, row, "max_index_interval");
1219   add_field(buffer, row, "min_index_interval");
1220   add_field(buffer, row, "memtable_flush_period_in_ms");
1221   add_field(buffer, row, "read_repair_chance");
1222 
1223   if (server_version >= VersionNumber(3, 0, 0)) {
1224     add_field(buffer, row, "dclocal_read_repair_chance");
1225     add_field(buffer, row, "crc_check_chance");
1226     add_field(buffer, row, "compaction");
1227     add_field(buffer, row, "compression");
1228     add_field(buffer, row, "extensions");
1229   } else {
1230     add_field(buffer, row, "cf_id");
1231     add_field(buffer, row, "local_read_repair_chance");
1232 
1233     add_field(buffer, row, "compaction_strategy_class");
1234     add_json_map_field(row, "compaction_strategy_options");
1235     add_json_map_field(row, "compression_parameters");
1236 
1237     add_json_list_field(row, "column_aliases");
1238     add_field(buffer, row, "comparator");
1239     add_field(buffer, row, "subcomparator");
1240     add_field(buffer, row, "default_validator");
1241     add_field(buffer, row, "key_alias");
1242     add_json_list_field(row, "key_aliases");
1243     add_field(buffer, row, "value_alias");
1244     add_field(buffer, row, "key_validator");
1245     add_field(buffer, row, "type");
1246 
1247     add_field(buffer, row, "dropped_columns");
1248     add_field(buffer, row, "index_interval");
1249     add_field(buffer, row, "is_dense");
1250     add_field(buffer, row, "max_compaction_threshold");
1251     add_field(buffer, row, "min_compaction_threshold");
1252     add_field(buffer, row, "populate_io_cache_on_flush");
1253     add_field(buffer, row, "replicate_on_write");
1254   }
1255 }
1256 
get_column(const String & name) const1257 const ColumnMetadata* TableMetadataBase::get_column(const String& name) const {
1258   ColumnMetadata::Map::const_iterator i = columns_by_name_.find(name);
1259   if (i == columns_by_name_.end()) return NULL;
1260   return i->second.get();
1261 }
1262 
add_column(const VersionNumber & server_version,const ColumnMetadata::Ptr & column)1263 void TableMetadataBase::add_column(const VersionNumber& server_version,
1264                                    const ColumnMetadata::Ptr& column) {
1265   if (columns_by_name_.insert(std::make_pair(column->name(), column)).second) {
1266     columns_.push_back(column);
1267   }
1268 }
1269 
clear_columns()1270 void TableMetadataBase::clear_columns() {
1271   columns_.clear();
1272   columns_by_name_.clear();
1273   partition_key_.clear();
1274   clustering_key_.clear();
1275 }
1276 
get_column_count(const ColumnMetadata::Vec & columns,CassColumnType type)1277 size_t get_column_count(const ColumnMetadata::Vec& columns, CassColumnType type) {
1278   size_t count = 0;
1279   for (ColumnMetadata::Vec::const_iterator i = columns.begin(), end = columns.end(); i != end;
1280        ++i) {
1281     if ((*i)->type() == type) count++;
1282   }
1283   return count;
1284 }
1285 
build_keys_and_sort(const VersionNumber & server_version,SimpleDataTypeCache & cache)1286 void TableMetadataBase::build_keys_and_sort(const VersionNumber& server_version,
1287                                             SimpleDataTypeCache& cache) {
1288   // Also, Reorders columns so that the order is:
1289   // 1) Parition key
1290   // 2) Clustering keys
1291   // 3) Other columns
1292 
1293   if (server_version.major_version() >= 2) {
1294     partition_key_.resize(get_column_count(columns_, CASS_COLUMN_TYPE_PARTITION_KEY));
1295     clustering_key_.resize(get_column_count(columns_, CASS_COLUMN_TYPE_CLUSTERING_KEY));
1296     clustering_key_order_.resize(clustering_key_.size(), CASS_CLUSTERING_ORDER_NONE);
1297     for (ColumnMetadata::Vec::const_iterator i = columns_.begin(), end = columns_.end(); i != end;
1298          ++i) {
1299       const ColumnMetadata::Ptr& column(*i);
1300       if (column->type() == CASS_COLUMN_TYPE_PARTITION_KEY && column->position() >= 0 &&
1301           static_cast<size_t>(column->position()) < partition_key_.size()) {
1302         partition_key_[column->position()] = column;
1303       } else if (column->type() == CASS_COLUMN_TYPE_CLUSTERING_KEY && column->position() >= 0 &&
1304                  static_cast<size_t>(column->position()) < clustering_key_.size()) {
1305         clustering_key_[column->position()] = column;
1306         clustering_key_order_[column->position()] =
1307             column->is_reversed() ? CASS_CLUSTERING_ORDER_DESC : CASS_CLUSTERING_ORDER_ASC;
1308       }
1309     }
1310 
1311     ColumnMetadata::Vec columns;
1312     columns.reserve(columns_.size());
1313 
1314     std::copy(partition_key_.begin(), partition_key_.end(),
1315               std::back_inserter<ColumnMetadata::Vec>(columns));
1316     std::copy(clustering_key_.begin(), clustering_key_.end(),
1317               std::back_inserter<ColumnMetadata::Vec>(columns));
1318 
1319     for (ColumnMetadata::Vec::const_iterator i = columns_.begin(), end = columns_.end(); i != end;
1320          ++i) {
1321       const ColumnMetadata::Ptr& column(*i);
1322       if (column->type() != CASS_COLUMN_TYPE_PARTITION_KEY &&
1323           column->type() != CASS_COLUMN_TYPE_CLUSTERING_KEY) {
1324         columns.push_back(column);
1325       }
1326     }
1327 
1328     columns_.swap(columns);
1329   } else {
1330     // Cassandra 1.2 requires a lot more work because "system.schema_columns" only
1331     // contains regular columns.
1332 
1333     // Partition key
1334     {
1335       StringRefVec key_aliases;
1336       const Value* key_aliases_value = get_field("key_aliases");
1337       if (key_aliases_value != NULL) {
1338         CollectionIterator iterator(key_aliases_value);
1339         while (iterator.next()) {
1340           key_aliases.push_back(iterator.value()->to_string_ref());
1341         }
1342       }
1343 
1344       ParseResult::Ptr key_validator =
1345           DataTypeClassNameParser::parse_with_composite(get_string_field("key_validator"), cache);
1346       size_t size = key_validator->types().size();
1347       partition_key_.reserve(size);
1348       for (size_t i = 0; i < size; ++i) {
1349         String key_alias;
1350         if (i < key_aliases.size()) {
1351           key_alias = key_aliases[i].to_string();
1352         } else {
1353           OStringStream ss("key");
1354           if (i > 0) {
1355             ss << i + 1;
1356           }
1357           key_alias = ss.str();
1358         }
1359         partition_key_.push_back(ColumnMetadata::Ptr(
1360             new ColumnMetadata(key_alias, partition_key_.size(), CASS_COLUMN_TYPE_PARTITION_KEY,
1361                                key_validator->types()[i])));
1362       }
1363     }
1364 
1365     // Clustering key
1366     {
1367       StringRefVec column_aliases;
1368       const Value* column_aliases_value = get_field("column_aliases");
1369       if (column_aliases_value != NULL) {
1370         CollectionIterator iterator(column_aliases_value);
1371         while (iterator.next()) {
1372           column_aliases.push_back(iterator.value()->to_string_ref());
1373         }
1374       }
1375 
1376       // TODO: Figure out how to test these special cases and properly document them here
1377       ParseResult::Ptr comparator =
1378           DataTypeClassNameParser::parse_with_composite(get_string_field("comparator"), cache);
1379       size_t size = comparator->types().size();
1380       if (comparator->is_composite()) {
1381         if (!comparator->collections().empty() ||
1382             (column_aliases.size() == size - 1 &&
1383              comparator->types().back()->value_type() == CASS_VALUE_TYPE_TEXT)) {
1384           size = size - 1;
1385         }
1386       } else {
1387         size = !column_aliases.empty() || columns_.empty() ? size : 0;
1388       }
1389       clustering_key_.reserve(size);
1390       for (size_t i = 0; i < size; ++i) {
1391         String column_alias;
1392         if (i < column_aliases.size()) {
1393           column_alias = column_aliases[i].to_string();
1394         } else {
1395           OStringStream ss("column");
1396           if (i > 0) {
1397             ss << i + 1;
1398           }
1399           column_alias = ss.str();
1400         }
1401         clustering_key_.push_back(ColumnMetadata::Ptr(
1402             new ColumnMetadata(column_alias, clustering_key_.size(),
1403                                CASS_COLUMN_TYPE_CLUSTERING_KEY, comparator->types()[i])));
1404         clustering_key_order_.push_back(comparator->reversed()[i] ? CASS_CLUSTERING_ORDER_DESC
1405                                                                   : CASS_CLUSTERING_ORDER_ASC);
1406       }
1407     }
1408 
1409     // TODO: Handle value alias column
1410 
1411     ColumnMetadata::Vec columns(partition_key_.size() + clustering_key_.size() + columns_.size());
1412 
1413     ColumnMetadata::Vec::iterator pos = columns.begin();
1414     pos = std::copy(partition_key_.begin(), partition_key_.end(), pos);
1415     pos = std::copy(clustering_key_.begin(), clustering_key_.end(), pos);
1416     std::copy(columns_.begin(), columns_.end(), pos);
1417 
1418     columns_.swap(columns);
1419   }
1420 }
1421 
1422 const TableMetadata::Ptr TableMetadata::NIL;
1423 
TableMetadata(const VersionNumber & server_version,const String & name,const RefBuffer::Ptr & buffer,const Row * row,bool is_virtual)1424 TableMetadata::TableMetadata(const VersionNumber& server_version, const String& name,
1425                              const RefBuffer::Ptr& buffer, const Row* row, bool is_virtual)
1426     : TableMetadataBase(server_version, name, buffer, row, is_virtual) {
1427   add_field(buffer, row, table_column_name(server_version));
1428   if (server_version >= VersionNumber(3, 0, 0)) {
1429     add_field(buffer, row, "flags");
1430   }
1431 }
1432 
add_column(const VersionNumber & server_version,const ColumnMetadata::Ptr & column)1433 void TableMetadata::add_column(const VersionNumber& server_version,
1434                                const ColumnMetadata::Ptr& column) {
1435   if (server_version >= VersionNumber(3, 0, 0)) {
1436     if (column->type() == CASS_COLUMN_TYPE_REGULAR && column->data_type()->is_custom()) {
1437       const CustomType* customType = static_cast<const CustomType*>(column->data_type().get());
1438       if (customType->class_name() == EMPTY_TYPE) {
1439         // Don't add this column; it's a surrogate column in a dense table and
1440         // should not be exposed to the user.
1441         return;
1442       }
1443     }
1444   } else if (column->type() == CASS_COLUMN_TYPE_COMPACT_VALUE && column->name().empty()) {
1445     // Don't add this column; it's a surrogate column in a dense table and
1446     // should not be exposed to the user.
1447     return;
1448   }
1449   TableMetadataBase::add_column(server_version, column);
1450 }
1451 
get_view(const String & name) const1452 const ViewMetadata* TableMetadata::get_view(const String& name) const {
1453   ViewMetadata::Vec::const_iterator i = std::lower_bound(views_.begin(), views_.end(), name);
1454   if (i == views_.end() || (*i)->name() != name) return NULL;
1455   return i->get();
1456 }
1457 
add_view(const ViewMetadata::Ptr & view)1458 void TableMetadata::add_view(const ViewMetadata::Ptr& view) { views_.push_back(view); }
1459 
sort_views()1460 void TableMetadata::sort_views() { std::sort(views_.begin(), views_.end()); }
1461 
key_aliases(SimpleDataTypeCache & cache,KeyAliases * output) const1462 void TableMetadata::key_aliases(SimpleDataTypeCache& cache, KeyAliases* output) const {
1463   const Value* aliases = get_field("key_aliases");
1464   if (aliases != NULL) {
1465     output->reserve(aliases->count());
1466     CollectionIterator itr(aliases);
1467     while (itr.next()) {
1468       output->push_back(itr.value()->to_string());
1469     }
1470   }
1471   if (output->empty()) { // C* 1.2 tables created via CQL2 or thrift don't have col meta or key
1472                          // aliases
1473     ParseResult::Ptr key_validator_type =
1474         DataTypeClassNameParser::parse_with_composite(get_string_field("key_validator"), cache);
1475     const size_t count = key_validator_type->types().size();
1476     OStringStream ss("key");
1477     for (size_t i = 0; i < count; ++i) {
1478       if (i > 0) {
1479         ss.seekp(3); // position after "key"
1480         ss << i + 1;
1481       }
1482       output->push_back(ss.str());
1483     }
1484   }
1485 }
1486 
1487 const ViewMetadata::Ptr ViewMetadata::NIL;
1488 
ViewMetadata(const VersionNumber & server_version,const TableMetadata * table,const String & name,const RefBuffer::Ptr & buffer,const Row * row,bool is_virtual)1489 ViewMetadata::ViewMetadata(const VersionNumber& server_version, const TableMetadata* table,
1490                            const String& name, const RefBuffer::Ptr& buffer, const Row* row,
1491                            bool is_virtual)
1492     : TableMetadataBase(server_version, name, buffer, row, is_virtual)
1493     , base_table_(table) {
1494   add_field(buffer, row, "keyspace_name");
1495   add_field(buffer, row, "view_name");
1496   add_field(buffer, row, "base_table_name");
1497   add_field(buffer, row, "base_table_id");
1498   add_field(buffer, row, "include_all_columns");
1499   add_field(buffer, row, "where_clause");
1500 }
1501 
get_index(const String & name) const1502 const IndexMetadata* TableMetadata::get_index(const String& name) const {
1503   IndexMetadata::Map::const_iterator i = indexes_by_name_.find(name);
1504   if (i == indexes_by_name_.end()) return NULL;
1505   return i->second.get();
1506 }
1507 
add_index(const IndexMetadata::Ptr & index)1508 void TableMetadata::add_index(const IndexMetadata::Ptr& index) {
1509   if (indexes_by_name_.insert(std::make_pair(index->name(), index)).second) {
1510     indexes_.push_back(index);
1511   }
1512 }
1513 
clear_indexes()1514 void TableMetadata::clear_indexes() {
1515   indexes_.clear();
1516   indexes_by_name_.clear();
1517 }
1518 
FunctionMetadata(const VersionNumber & server_version,SimpleDataTypeCache & cache,const String & name,const Value * signature,KeyspaceMetadata * keyspace,const RefBuffer::Ptr & buffer,const Row * row)1519 FunctionMetadata::FunctionMetadata(const VersionNumber& server_version, SimpleDataTypeCache& cache,
1520                                    const String& name, const Value* signature,
1521                                    KeyspaceMetadata* keyspace, const RefBuffer::Ptr& buffer,
1522                                    const Row* row)
1523     : MetadataBase(Metadata::full_function_name(name, signature->as_stringlist()))
1524     , simple_name_(name) {
1525   const Value* value1;
1526   const Value* value2;
1527 
1528   add_field(buffer, row, "keyspace_name");
1529   add_field(buffer, row, "function_name");
1530 
1531   value1 = add_field(buffer, row, "argument_names");
1532   value2 = add_field(buffer, row, "argument_types");
1533   if (value1 != NULL && value1->value_type() == CASS_VALUE_TYPE_LIST &&
1534       value1->primary_value_type() == CASS_VALUE_TYPE_VARCHAR && value2 != NULL &&
1535       value2->value_type() == CASS_VALUE_TYPE_LIST &&
1536       value2->primary_value_type() == CASS_VALUE_TYPE_VARCHAR) {
1537     CollectionIterator iterator1(value1);
1538     CollectionIterator iterator2(value2);
1539     if (server_version >= VersionNumber(3, 0, 0)) {
1540       while (iterator1.next() && iterator2.next()) {
1541         StringRef arg_name(iterator1.value()->to_string_ref());
1542         DataType::ConstPtr arg_type(
1543             DataTypeCqlNameParser::parse(iterator2.value()->to_string(), cache, keyspace));
1544         args_.push_back(Argument(arg_name, arg_type));
1545       }
1546     } else {
1547       while (iterator1.next() && iterator2.next()) {
1548         StringRef arg_name(iterator1.value()->to_string_ref());
1549         DataType::ConstPtr arg_type(
1550             DataTypeClassNameParser::parse_one(iterator2.value()->to_string(), cache));
1551         args_.push_back(Argument(arg_name, arg_type));
1552       }
1553     }
1554   }
1555 
1556   value1 = add_field(buffer, row, "return_type");
1557   if (value1 != NULL && value1->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1558     if (server_version >= VersionNumber(3, 0, 0)) {
1559       return_type_ = DataTypeCqlNameParser::parse(value1->to_string(), cache, keyspace);
1560     } else {
1561       return_type_ = DataTypeClassNameParser::parse_one(value1->to_string(), cache);
1562     }
1563   }
1564 
1565   value1 = add_field(buffer, row, "body");
1566   if (value1 != NULL && value1->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1567     body_ = value1->to_string_ref();
1568   }
1569 
1570   value1 = add_field(buffer, row, "language");
1571   if (value1 != NULL && value1->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1572     language_ = value1->to_string_ref();
1573   }
1574 
1575   value1 = add_field(buffer, row, "called_on_null_input");
1576   if (value1 != NULL && value1->value_type() == CASS_VALUE_TYPE_BOOLEAN) {
1577     called_on_null_input_ = value1->as_bool();
1578   }
1579 }
1580 
get_arg_type(StringRef name) const1581 const DataType* FunctionMetadata::get_arg_type(StringRef name) const {
1582   Argument::Vec::const_iterator i = std::find(args_.begin(), args_.end(), name);
1583   if (i == args_.end()) return NULL;
1584   return i->type.get();
1585 }
1586 
AggregateMetadata(const VersionNumber & server_version,SimpleDataTypeCache & cache,const String & name,const Value * signature,KeyspaceMetadata * keyspace,const RefBuffer::Ptr & buffer,const Row * row)1587 AggregateMetadata::AggregateMetadata(const VersionNumber& server_version,
1588                                      SimpleDataTypeCache& cache, const String& name,
1589                                      const Value* signature, KeyspaceMetadata* keyspace,
1590                                      const RefBuffer::Ptr& buffer, const Row* row)
1591     : MetadataBase(Metadata::full_function_name(name, signature->as_stringlist()))
1592     , simple_name_(name) {
1593   const Value* value;
1594   const FunctionMetadata::Map& functions = keyspace->functions();
1595 
1596   add_field(buffer, row, "keyspace_name");
1597   add_field(buffer, row, "aggregate_name");
1598 
1599   value = add_field(buffer, row, "argument_types");
1600   if (value != NULL && value->value_type() == CASS_VALUE_TYPE_LIST &&
1601       value->primary_value_type() == CASS_VALUE_TYPE_VARCHAR) {
1602     CollectionIterator iterator(value);
1603     if (server_version >= VersionNumber(3, 0, 0)) {
1604       while (iterator.next()) {
1605         arg_types_.push_back(
1606             DataTypeCqlNameParser::parse(iterator.value()->to_string(), cache, keyspace));
1607       }
1608     } else {
1609       while (iterator.next()) {
1610         arg_types_.push_back(
1611             DataTypeClassNameParser::parse_one(iterator.value()->to_string(), cache));
1612       }
1613     }
1614   }
1615 
1616   value = add_field(buffer, row, "return_type");
1617   if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1618     if (server_version >= VersionNumber(3, 0, 0)) {
1619       return_type_ = DataTypeCqlNameParser::parse(value->to_string(), cache, keyspace);
1620     } else {
1621       return_type_ = DataTypeClassNameParser::parse_one(value->to_string(), cache);
1622     }
1623   }
1624 
1625   value = add_field(buffer, row, "state_type");
1626   if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1627     if (server_version >= VersionNumber(3, 0, 0)) {
1628       state_type_ = DataTypeCqlNameParser::parse(value->to_string(), cache, keyspace);
1629     } else {
1630       state_type_ = DataTypeClassNameParser::parse_one(value->to_string(), cache);
1631     }
1632   }
1633 
1634   value = add_field(buffer, row, "final_func");
1635   if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1636     StringVec final_func_signature;
1637     final_func_signature.push_back(state_type_->to_string());
1638     String full_final_func_name(
1639         Metadata::full_function_name(value->to_string(), final_func_signature));
1640     FunctionMetadata::Map::const_iterator i = functions.find(full_final_func_name);
1641     if (i != functions.end()) final_func_ = i->second;
1642   }
1643 
1644   value = add_field(buffer, row, "state_func");
1645   if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1646     StringVec state_func_signature;
1647     state_func_signature.push_back(state_type_->to_string());
1648     CollectionIterator iterator(signature);
1649     while (iterator.next()) {
1650       state_func_signature.push_back(iterator.value()->to_string());
1651     }
1652     String full_state_func_name(
1653         Metadata::full_function_name(value->to_string(), state_func_signature));
1654     FunctionMetadata::Map::const_iterator i = functions.find(full_state_func_name);
1655     if (i != functions.end()) state_func_ = i->second;
1656   }
1657 
1658   value = add_field(buffer, row, "initcond");
1659   if (value != NULL) {
1660     if (value->value_type() == CASS_VALUE_TYPE_BLOB) {
1661       init_cond_ = Value(state_type_, value->decoder());
1662     } else if (server_version >= VersionNumber(3, 0, 0) &&
1663                value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1664       init_cond_ = Value(cache.by_value_type(CASS_VALUE_TYPE_VARCHAR), value->decoder());
1665     }
1666   }
1667 }
1668 
from_row(const String & index_name,const RefBuffer::Ptr & buffer,const Row * row)1669 IndexMetadata::Ptr IndexMetadata::from_row(const String& index_name, const RefBuffer::Ptr& buffer,
1670                                            const Row* row) {
1671   IndexMetadata::Ptr index(new IndexMetadata(index_name));
1672 
1673   StringRef kind;
1674   const Value* value = index->add_field(buffer, row, "kind");
1675   if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1676     kind = value->to_string_ref();
1677   }
1678 
1679   const Value* options = index->add_field(buffer, row, "options");
1680   index->update(kind, options);
1681 
1682   return index;
1683 }
1684 
update(StringRef kind,const Value * options)1685 void IndexMetadata::update(StringRef kind, const Value* options) {
1686   type_ = index_type_from_string(kind);
1687 
1688   if (options != NULL && options->value_type() == CASS_VALUE_TYPE_MAP) {
1689     MapIterator iterator(options);
1690     while (iterator.next()) {
1691       if (iterator.key()->to_string_ref() == "target") {
1692         target_ = iterator.value()->to_string();
1693       }
1694     }
1695   }
1696 
1697   options_ = *options;
1698 }
1699 
from_legacy(const String & index_name,const ColumnMetadata * column,const RefBuffer::Ptr & buffer,const Row * row)1700 IndexMetadata::Ptr IndexMetadata::from_legacy(const String& index_name,
1701                                               const ColumnMetadata* column,
1702                                               const RefBuffer::Ptr& buffer, const Row* row) {
1703   IndexMetadata::Ptr index(new IndexMetadata(index_name));
1704 
1705   index->add_field(buffer, row, "index_name");
1706 
1707   StringRef index_type;
1708   const Value* value = index->add_field(buffer, row, "index_type");
1709   if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1710     index_type = value->to_string_ref();
1711   }
1712 
1713   const Value* options = index->add_json_map_field(row, "index_options");
1714   index->update_legacy(index_type, column, options);
1715 
1716   return index;
1717 }
1718 
update_legacy(StringRef index_type,const ColumnMetadata * column,const Value * options)1719 void IndexMetadata::update_legacy(StringRef index_type, const ColumnMetadata* column,
1720                                   const Value* options) {
1721   type_ = index_type_from_string(index_type);
1722   target_ = target_from_legacy(column, options);
1723   options_ = *options;
1724 }
1725 
target_from_legacy(const ColumnMetadata * column,const Value * options)1726 String IndexMetadata::target_from_legacy(const ColumnMetadata* column, const Value* options) {
1727   String column_name(column->name());
1728 
1729   escape_id(column_name);
1730 
1731   if (options != NULL && options->value_type() == CASS_VALUE_TYPE_MAP) {
1732     MapIterator iterator(options);
1733 
1734     while (iterator.next()) {
1735       String key(iterator.key()->to_string());
1736       if (key.find("index_keys") != String::npos) {
1737         return "keys(" + column_name + ")";
1738       } else if (key.find("index_keys_and_values") != String::npos) {
1739         return "entries(" + column_name + ")";
1740       } else if (column->data_type()->is_collection()) { // TODO(mpenick): && is_frozen()
1741         return "full(" + column_name + ")";
1742       }
1743     }
1744   }
1745 
1746   return column_name;
1747 }
1748 
index_type_from_string(StringRef index_type)1749 CassIndexType IndexMetadata::index_type_from_string(StringRef index_type) {
1750   if (index_type.iequals("keys")) {
1751     return CASS_INDEX_TYPE_KEYS;
1752   } else if (index_type.iequals("custom")) {
1753     return CASS_INDEX_TYPE_CUSTOM;
1754   } else if (index_type.iequals("composites")) {
1755     return CASS_INDEX_TYPE_COMPOSITES;
1756   }
1757   return CASS_INDEX_TYPE_UNKNOWN;
1758 }
1759 
ColumnMetadata(const VersionNumber & server_version,SimpleDataTypeCache & cache,const String & name,KeyspaceMetadata * keyspace,const RefBuffer::Ptr & buffer,const Row * row)1760 ColumnMetadata::ColumnMetadata(const VersionNumber& server_version, SimpleDataTypeCache& cache,
1761                                const String& name, KeyspaceMetadata* keyspace,
1762                                const RefBuffer::Ptr& buffer, const Row* row)
1763     : MetadataBase(name)
1764     , type_(CASS_COLUMN_TYPE_REGULAR)
1765     , position_(0)
1766     , is_reversed_(false) {
1767   const Value* value;
1768 
1769   add_field(buffer, row, "keyspace_name");
1770   add_field(buffer, row, table_column_name(server_version));
1771   add_field(buffer, row, "column_name");
1772 
1773   if (server_version >= VersionNumber(3, 0, 0)) {
1774     value = add_field(buffer, row, "clustering_order");
1775     if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR &&
1776         value->to_string_ref().iequals("desc")) {
1777       is_reversed_ = true;
1778     }
1779 
1780     add_field(buffer, row, "column_name_bytes");
1781 
1782     value = add_field(buffer, row, "kind");
1783     if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1784       StringRef type = value->to_string_ref();
1785       if (type == "partition_key") {
1786         type_ = CASS_COLUMN_TYPE_PARTITION_KEY;
1787       } else if (type == "clustering") {
1788         type_ = CASS_COLUMN_TYPE_CLUSTERING_KEY;
1789       } else if (type == "static") {
1790         type_ = CASS_COLUMN_TYPE_STATIC;
1791       } else {
1792         type_ = CASS_COLUMN_TYPE_REGULAR;
1793       }
1794     }
1795 
1796     value = add_field(buffer, row, "position");
1797     if (value != NULL && value->value_type() == CASS_VALUE_TYPE_INT) {
1798       position_ = value->as_int32();
1799       if (position_ < 0) position_ = 0;
1800     }
1801 
1802     value = add_field(buffer, row, "type");
1803     if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1804       String type(value->to_string());
1805       data_type_ = DataTypeCqlNameParser::parse(type, cache, keyspace);
1806     }
1807   } else {
1808     value = add_field(buffer, row, "type");
1809     if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1810       StringRef type = value->to_string_ref();
1811       if (type == "partition_key") {
1812         type_ = CASS_COLUMN_TYPE_PARTITION_KEY;
1813       } else if (type == "clustering_key") {
1814         type_ = CASS_COLUMN_TYPE_CLUSTERING_KEY;
1815       } else if (type == "static") {
1816         type_ = CASS_COLUMN_TYPE_STATIC;
1817       } else if (type == "compact_value") {
1818         type_ = CASS_COLUMN_TYPE_COMPACT_VALUE;
1819       } else {
1820         type_ = CASS_COLUMN_TYPE_REGULAR;
1821       }
1822     }
1823 
1824     value = add_field(buffer, row, "component_index");
1825     // For C* 2.0 to 2.2 this is "null" for single component partition keys
1826     // so the default position of 0 works. C* 1.2 and below don't use this.
1827     if (value != NULL && value->value_type() == CASS_VALUE_TYPE_INT) {
1828       position_ = value->as_int32();
1829     }
1830 
1831     value = add_field(buffer, row, "validator");
1832     if (value != NULL && value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
1833       String validator(value->to_string());
1834       data_type_ = DataTypeClassNameParser::parse_one(validator, cache);
1835       is_reversed_ = DataTypeClassNameParser::is_reversed(validator);
1836     }
1837 
1838     add_field(buffer, row, "index_type");
1839     add_field(buffer, row, "index_name");
1840     add_json_map_field(row, "index_options");
1841   }
1842 }
1843 
update_keyspaces(const VersionNumber & server_version,const ResultResponse * result,bool is_virtual)1844 void Metadata::InternalData::update_keyspaces(const VersionNumber& server_version,
1845                                               const ResultResponse* result, bool is_virtual) {
1846   RefBuffer::Ptr buffer = result->buffer();
1847   ResultIterator rows(result);
1848 
1849   while (rows.next()) {
1850     String keyspace_name;
1851     const Row* row = rows.row();
1852 
1853     if (!row->get_string_by_name("keyspace_name", &keyspace_name)) {
1854       LOG_ERROR("Unable to get column value for 'keyspace_name'");
1855       continue;
1856     }
1857 
1858     KeyspaceMetadata* keyspace = get_or_create_keyspace(keyspace_name, is_virtual);
1859     keyspace->update(server_version, buffer, row);
1860   }
1861 }
1862 
update_tables(const VersionNumber & server_version,const ResultResponse * result)1863 void Metadata::InternalData::update_tables(const VersionNumber& server_version,
1864                                            const ResultResponse* result) {
1865   RefBuffer::Ptr buffer = result->buffer();
1866 
1867   ResultIterator rows(result);
1868 
1869   String keyspace_name;
1870   String table_name;
1871   KeyspaceMetadata* keyspace = NULL;
1872 
1873   while (rows.next()) {
1874     String temp_keyspace_name;
1875     const Row* row = rows.row();
1876 
1877     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
1878         !row->get_string_by_name(table_column_name(server_version), &table_name)) {
1879       LOG_ERROR("Unable to get column value for 'keyspace_name' or '%s'",
1880                 table_column_name(server_version));
1881       continue;
1882     }
1883 
1884     if (keyspace_name != temp_keyspace_name) {
1885       keyspace_name = temp_keyspace_name;
1886       keyspace = get_or_create_keyspace(keyspace_name);
1887     }
1888 
1889     keyspace->add_table(TableMetadata::Ptr(
1890         new TableMetadata(server_version, table_name, buffer, row, keyspace->is_virtual())));
1891   }
1892 }
1893 
update_views(const VersionNumber & server_version,const ResultResponse * result)1894 void Metadata::InternalData::update_views(const VersionNumber& server_version,
1895                                           const ResultResponse* result) {
1896   RefBuffer::Ptr buffer = result->buffer();
1897 
1898   ResultIterator rows(result);
1899 
1900   String keyspace_name;
1901   String view_name;
1902   KeyspaceMetadata* keyspace = NULL;
1903 
1904   TableMetadata::Vec updated_tables;
1905 
1906   while (rows.next()) {
1907     String temp_keyspace_name;
1908     String base_table_name;
1909     const Row* row = rows.row();
1910 
1911     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
1912         !row->get_string_by_name("view_name", &view_name)) {
1913       LOG_ERROR("Unable to get column value for 'keyspace_name' and 'view_name'");
1914       continue;
1915     }
1916 
1917     if (keyspace_name != temp_keyspace_name) {
1918       keyspace_name = temp_keyspace_name;
1919       keyspace = get_or_create_keyspace(keyspace_name);
1920     }
1921 
1922     if (!row->get_string_by_name("base_table_name", &base_table_name)) {
1923       LOG_ERROR("Unable to get column value for 'base_table_name'");
1924       continue;
1925     }
1926 
1927     // Properly remove the previous view if it exists. This needs to be done
1928     // before the next step of finding the table because it could create a
1929     // new copy of the old table.
1930     keyspace->drop_table_or_view(view_name);
1931 
1932     TableMetadata::Ptr table(keyspace->get_table(base_table_name));
1933     if (!table) {
1934       LOG_ERROR("No table metadata for view with base table name '%s'", base_table_name.c_str());
1935       continue;
1936     }
1937 
1938     ViewMetadata::Ptr view(new ViewMetadata(server_version, table.get(), view_name, buffer, row,
1939                                             keyspace->is_virtual()));
1940     keyspace->add_view(view);
1941     table->add_view(view);
1942     updated_tables.push_back(table);
1943   }
1944 
1945   for (TableMetadata::Vec::iterator i = updated_tables.begin(), end = updated_tables.end();
1946        i != end; ++i) {
1947     (*i)->sort_views();
1948   }
1949 }
1950 
update_user_types(const VersionNumber & server_version,SimpleDataTypeCache & cache,const ResultResponse * result)1951 void Metadata::InternalData::update_user_types(const VersionNumber& server_version,
1952                                                SimpleDataTypeCache& cache,
1953                                                const ResultResponse* result) {
1954   ResultIterator rows(result);
1955 
1956   String keyspace_name;
1957   KeyspaceMetadata* keyspace = NULL;
1958 
1959   while (rows.next()) {
1960     String temp_keyspace_name;
1961     String type_name;
1962     const Row* row = rows.row();
1963 
1964     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
1965         !row->get_string_by_name("type_name", &type_name)) {
1966       LOG_ERROR("Unable to get column value for 'keyspace_name' or 'type_name'");
1967       continue;
1968     }
1969 
1970     if (keyspace_name != temp_keyspace_name) {
1971       keyspace_name = temp_keyspace_name;
1972       keyspace = get_or_create_keyspace(keyspace_name);
1973     }
1974 
1975     const Value* names_value = row->get_by_name("field_names");
1976     if (names_value == NULL || names_value->is_null()) {
1977       LOG_ERROR("'field_name's column for keyspace \"%s\" and type \"%s\" is null",
1978                 keyspace_name.c_str(), type_name.c_str());
1979       continue;
1980     }
1981 
1982     const Value* types_value = row->get_by_name("field_types");
1983     if (types_value == NULL || types_value->is_null()) {
1984       LOG_ERROR("'field_type's column for keyspace '%s' and type '%s' is null",
1985                 keyspace_name.c_str(), type_name.c_str());
1986       continue;
1987     }
1988 
1989     CollectionIterator names(names_value);
1990     CollectionIterator types(types_value);
1991 
1992     UserType::FieldVec fields;
1993 
1994     while (names.next()) {
1995       if (!types.next()) {
1996         LOG_ERROR("The number of 'field_type's doesn\"t match the number of field_names for "
1997                   "keyspace \"%s\" and type \"%s\"",
1998                   keyspace_name.c_str(), type_name.c_str());
1999         break;
2000       }
2001 
2002       const Value* name = names.value();
2003       const Value* type = types.value();
2004 
2005       if (name->is_null() || type->is_null()) {
2006         LOG_ERROR("'field_name' or 'field_type' is null for keyspace \"%s\" and type \"%s\"",
2007                   keyspace_name.c_str(), type_name.c_str());
2008         break;
2009       }
2010 
2011       String field_name(name->to_string());
2012 
2013       DataType::ConstPtr data_type;
2014 
2015       if (server_version >= VersionNumber(3, 0, 0)) {
2016         data_type = DataTypeCqlNameParser::parse(type->to_string(), cache, keyspace);
2017       } else {
2018         data_type = DataTypeClassNameParser::parse_one(type->to_string(), cache);
2019       }
2020 
2021       if (!data_type) {
2022         LOG_ERROR("Invalid 'field_type' for field \"%s\", keyspace \"%s\" and type \"%s\"",
2023                   field_name.c_str(), keyspace_name.c_str(), type_name.c_str());
2024         break;
2025       }
2026 
2027       fields.push_back(UserType::Field(field_name, data_type));
2028     }
2029 
2030     keyspace->get_or_create_user_type(type_name, false)->set_fields(fields);
2031   }
2032 }
2033 
update_functions(const VersionNumber & server_version,SimpleDataTypeCache & cache,const ResultResponse * result)2034 void Metadata::InternalData::update_functions(const VersionNumber& server_version,
2035                                               SimpleDataTypeCache& cache,
2036                                               const ResultResponse* result) {
2037   RefBuffer::Ptr buffer = result->buffer();
2038 
2039   ResultIterator rows(result);
2040 
2041   String keyspace_name;
2042   KeyspaceMetadata* keyspace = NULL;
2043 
2044   while (rows.next()) {
2045     String temp_keyspace_name;
2046     String function_name;
2047     const Row* row = rows.row();
2048 
2049     const Value* signature = row->get_by_name(signature_column_name(server_version));
2050     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
2051         !row->get_string_by_name("function_name", &function_name) || signature == NULL) {
2052       LOG_ERROR("Unable to get column value for 'keyspace_name', 'function_name' or 'signature'");
2053       continue;
2054     }
2055 
2056     if (keyspace_name != temp_keyspace_name) {
2057       keyspace_name = temp_keyspace_name;
2058       keyspace = get_or_create_keyspace(keyspace_name);
2059     }
2060 
2061     keyspace->add_function(FunctionMetadata::Ptr(new FunctionMetadata(
2062         server_version, cache, function_name, signature, keyspace, buffer, row)));
2063   }
2064 }
2065 
update_aggregates(const VersionNumber & server_version,SimpleDataTypeCache & cache,const ResultResponse * result)2066 void Metadata::InternalData::update_aggregates(const VersionNumber& server_version,
2067                                                SimpleDataTypeCache& cache,
2068                                                const ResultResponse* result) {
2069   RefBuffer::Ptr buffer = result->buffer();
2070 
2071   ResultIterator rows(result);
2072 
2073   String keyspace_name;
2074   KeyspaceMetadata* keyspace = NULL;
2075 
2076   while (rows.next()) {
2077     String temp_keyspace_name;
2078     String aggregate_name;
2079     const Row* row = rows.row();
2080 
2081     const Value* signature = row->get_by_name(signature_column_name(server_version));
2082     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
2083         !row->get_string_by_name("aggregate_name", &aggregate_name) || signature == NULL) {
2084       LOG_ERROR("Unable to get column value for 'keyspace_name', 'aggregate_name' or 'signature'");
2085       continue;
2086     }
2087 
2088     if (keyspace_name != temp_keyspace_name) {
2089       keyspace_name = temp_keyspace_name;
2090       keyspace = get_or_create_keyspace(keyspace_name);
2091     }
2092 
2093     keyspace->add_aggregate(AggregateMetadata::Ptr(new AggregateMetadata(
2094         server_version, cache, aggregate_name, signature, keyspace, buffer, row)));
2095   }
2096 }
2097 
drop_keyspace(const String & keyspace_name)2098 void Metadata::InternalData::drop_keyspace(const String& keyspace_name) {
2099   keyspaces_->erase(keyspace_name);
2100 }
2101 
drop_table_or_view(const String & keyspace_name,const String & table_or_view_name)2102 void Metadata::InternalData::drop_table_or_view(const String& keyspace_name,
2103                                                 const String& table_or_view_name) {
2104   KeyspaceMetadata::Map::iterator i = keyspaces_->find(keyspace_name);
2105   if (i == keyspaces_->end()) return;
2106   i->second.drop_table_or_view(table_or_view_name);
2107 }
2108 
drop_user_type(const String & keyspace_name,const String & type_name)2109 void Metadata::InternalData::drop_user_type(const String& keyspace_name, const String& type_name) {
2110   KeyspaceMetadata::Map::iterator i = keyspaces_->find(keyspace_name);
2111   if (i == keyspaces_->end()) return;
2112   i->second.drop_user_type(type_name);
2113 }
2114 
drop_function(const String & keyspace_name,const String & full_function_name)2115 void Metadata::InternalData::drop_function(const String& keyspace_name,
2116                                            const String& full_function_name) {
2117   KeyspaceMetadata::Map::iterator i = keyspaces_->find(keyspace_name);
2118   if (i == keyspaces_->end()) return;
2119   i->second.drop_function(full_function_name);
2120 }
2121 
drop_aggregate(const String & keyspace_name,const String & full_aggregate_name)2122 void Metadata::InternalData::drop_aggregate(const String& keyspace_name,
2123                                             const String& full_aggregate_name) {
2124   KeyspaceMetadata::Map::iterator i = keyspaces_->find(keyspace_name);
2125   if (i == keyspaces_->end()) return;
2126   i->second.drop_aggregate(full_aggregate_name);
2127 }
2128 
update_columns(const VersionNumber & server_version,SimpleDataTypeCache & cache,const ResultResponse * result)2129 void Metadata::InternalData::update_columns(const VersionNumber& server_version,
2130                                             SimpleDataTypeCache& cache,
2131                                             const ResultResponse* result) {
2132   RefBuffer::Ptr buffer = result->buffer();
2133 
2134   ResultIterator rows(result);
2135 
2136   String keyspace_name;
2137   String table_or_view_name;
2138   String column_name;
2139 
2140   KeyspaceMetadata* keyspace = NULL;
2141   TableMetadataBase::Ptr table_or_view;
2142 
2143   while (rows.next()) {
2144     String temp_keyspace_name;
2145     String temp_table_or_view_name;
2146     const Row* row = rows.row();
2147 
2148     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
2149         !row->get_string_by_name(table_column_name(server_version), &temp_table_or_view_name) ||
2150         !row->get_string_by_name("column_name", &column_name)) {
2151       LOG_ERROR("Unable to get column value for 'keyspace_name', '%s' or 'column_name'",
2152                 table_column_name(server_version));
2153       continue;
2154     }
2155 
2156     if (keyspace_name != temp_keyspace_name) {
2157       keyspace_name = temp_keyspace_name;
2158       keyspace = get_or_create_keyspace(keyspace_name);
2159       table_or_view_name.clear();
2160     }
2161 
2162     if (table_or_view_name != temp_table_or_view_name) {
2163       // Build keys for the previous table
2164       if (table_or_view) {
2165         table_or_view->build_keys_and_sort(server_version, cache);
2166       }
2167       table_or_view_name = temp_table_or_view_name;
2168       table_or_view = TableMetadataBase::Ptr(keyspace->get_table(table_or_view_name));
2169       if (!table_or_view) {
2170         table_or_view = TableMetadataBase::Ptr(keyspace->get_view(table_or_view_name));
2171         if (!table_or_view) continue;
2172       }
2173       table_or_view->clear_columns();
2174     }
2175 
2176     if (table_or_view) {
2177       table_or_view->add_column(
2178           server_version, ColumnMetadata::Ptr(new ColumnMetadata(server_version, cache, column_name,
2179                                                                  keyspace, buffer, row)));
2180     }
2181   }
2182 
2183   // Build keys for the last table
2184   if (table_or_view) {
2185     table_or_view->build_keys_and_sort(server_version, cache);
2186   }
2187 }
2188 
update_legacy_indexes(const VersionNumber & server_version,const ResultResponse * result)2189 void Metadata::InternalData::update_legacy_indexes(const VersionNumber& server_version,
2190                                                    const ResultResponse* result) {
2191   RefBuffer::Ptr buffer = result->buffer();
2192 
2193   ResultIterator rows(result);
2194 
2195   String keyspace_name;
2196   String table_name;
2197   String column_name;
2198 
2199   KeyspaceMetadata* keyspace = NULL;
2200   TableMetadata::Ptr table;
2201 
2202   while (rows.next()) {
2203     String temp_keyspace_name;
2204     String temp_table_name;
2205     const Row* row = rows.row();
2206 
2207     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
2208         !row->get_string_by_name(table_column_name(server_version), &temp_table_name) ||
2209         !row->get_string_by_name("column_name", &column_name)) {
2210       LOG_ERROR("Unable to get column value for 'keyspace_name', '%s' or 'column_name'",
2211                 table_column_name(server_version));
2212       continue;
2213     }
2214 
2215     if (keyspace_name != temp_keyspace_name) {
2216       keyspace_name = temp_keyspace_name;
2217       keyspace = get_or_create_keyspace(keyspace_name);
2218       table_name.clear();
2219     }
2220 
2221     if (table_name != temp_table_name) {
2222       table_name = temp_table_name;
2223       table = keyspace->get_table(table_name);
2224       if (!table) continue;
2225       table->clear_indexes();
2226     }
2227 
2228     if (table) {
2229       const ColumnMetadata* column = table->get_column(column_name);
2230       if (column != NULL) {
2231         const Value* index_type = column->get_field("index_type");
2232         if (index_type != NULL && index_type->value_type() == CASS_VALUE_TYPE_VARCHAR) {
2233           String index_name = column->get_string_field("index_name");
2234           table->add_index(IndexMetadata::from_legacy(index_name, column, buffer, row));
2235         }
2236       }
2237     }
2238   }
2239 }
2240 
update_indexes(const VersionNumber & server_version,const ResultResponse * result)2241 void Metadata::InternalData::update_indexes(const VersionNumber& server_version,
2242                                             const ResultResponse* result) {
2243   RefBuffer::Ptr buffer = result->buffer();
2244 
2245   ResultIterator rows(result);
2246 
2247   String keyspace_name;
2248   String table_name;
2249   String index_name;
2250 
2251   KeyspaceMetadata* keyspace = NULL;
2252   TableMetadata::Ptr table;
2253 
2254   while (rows.next()) {
2255     String temp_keyspace_name;
2256     String temp_table_name;
2257     const Row* row = rows.row();
2258 
2259     if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
2260         !row->get_string_by_name("table_name", &temp_table_name) ||
2261         !row->get_string_by_name("index_name", &index_name)) {
2262       LOG_ERROR("Unable to get column value for 'keyspace_name', 'table_name' or 'index_name'");
2263       continue;
2264     }
2265 
2266     if (keyspace_name != temp_keyspace_name) {
2267       keyspace_name = temp_keyspace_name;
2268       keyspace = get_or_create_keyspace(keyspace_name);
2269       table_name.clear();
2270     }
2271 
2272     if (table_name != temp_table_name) {
2273       table_name = temp_table_name;
2274       table = keyspace->get_table(table_name);
2275       if (!table) continue;
2276       table->clear_indexes();
2277     }
2278 
2279     table->add_index(IndexMetadata::from_row(index_name, buffer, row));
2280   }
2281 }
2282 
get_or_create_keyspace(const String & name,bool is_virtual)2283 KeyspaceMetadata* Metadata::InternalData::get_or_create_keyspace(const String& name,
2284                                                                  bool is_virtual) {
2285   KeyspaceMetadata::Map::iterator i = keyspaces_->find(name);
2286   if (i == keyspaces_->end()) {
2287     i = keyspaces_->insert(std::make_pair(name, KeyspaceMetadata(name, is_virtual))).first;
2288   }
2289   return &i->second;
2290 }
2291