1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "result_response.hpp"
18 
19 #include "external.hpp"
20 #include "logger.hpp"
21 #include "protocol.hpp"
22 #include "result_metadata.hpp"
23 #include "result_response.hpp"
24 #include "serialization.hpp"
25 
26 using namespace datastax;
27 using namespace datastax::internal;
28 using namespace datastax::internal::core;
29 
30 extern "C" {
31 
cass_result_free(const CassResult * result)32 void cass_result_free(const CassResult* result) { result->dec_ref(); }
33 
cass_result_row_count(const CassResult * result)34 size_t cass_result_row_count(const CassResult* result) {
35   if (result->kind() == CASS_RESULT_KIND_ROWS) {
36     return result->row_count();
37   }
38   return 0;
39 }
40 
cass_result_column_count(const CassResult * result)41 size_t cass_result_column_count(const CassResult* result) {
42   if (result->kind() == CASS_RESULT_KIND_ROWS) {
43     return result->column_count();
44   }
45   return 0;
46 }
47 
cass_result_column_name(const CassResult * result,size_t index,const char ** name,size_t * name_length)48 CassError cass_result_column_name(const CassResult* result, size_t index, const char** name,
49                                   size_t* name_length) {
50   const SharedRefPtr<ResultMetadata>& metadata(result->metadata());
51   if (index >= metadata->column_count()) {
52     return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
53   }
54   if (result->kind() != CASS_RESULT_KIND_ROWS) {
55     return CASS_ERROR_LIB_BAD_PARAMS;
56   }
57   const ColumnDefinition def = metadata->get_column_definition(index);
58   *name = def.name.data();
59   *name_length = def.name.size();
60   return CASS_OK;
61 }
62 
cass_result_column_type(const CassResult * result,size_t index)63 CassValueType cass_result_column_type(const CassResult* result, size_t index) {
64   const SharedRefPtr<ResultMetadata>& metadata(result->metadata());
65   if (result->kind() == CASS_RESULT_KIND_ROWS && index < metadata->column_count()) {
66     return metadata->get_column_definition(index).data_type->value_type();
67   }
68   return CASS_VALUE_TYPE_UNKNOWN;
69 }
70 
cass_result_column_data_type(const CassResult * result,size_t index)71 const CassDataType* cass_result_column_data_type(const CassResult* result, size_t index) {
72   const SharedRefPtr<ResultMetadata>& metadata(result->metadata());
73   if (result->kind() == CASS_RESULT_KIND_ROWS && index < metadata->column_count()) {
74     return CassDataType::to(metadata->get_column_definition(index).data_type.get());
75   }
76   return NULL;
77 }
78 
cass_result_first_row(const CassResult * result)79 const CassRow* cass_result_first_row(const CassResult* result) {
80   if (result->kind() == CASS_RESULT_KIND_ROWS && result->row_count() > 0) {
81     return CassRow::to(&result->first_row());
82   }
83   return NULL;
84 }
85 
cass_result_has_more_pages(const CassResult * result)86 cass_bool_t cass_result_has_more_pages(const CassResult* result) {
87   return static_cast<cass_bool_t>(result->has_more_pages());
88 }
89 
cass_result_paging_state_token(const CassResult * result,const char ** paging_state,size_t * paging_state_size)90 CassError cass_result_paging_state_token(const CassResult* result, const char** paging_state,
91                                          size_t* paging_state_size) {
92   if (!result->has_more_pages()) {
93     return CASS_ERROR_LIB_NO_PAGING_STATE;
94   }
95   *paging_state = result->paging_state().data();
96   *paging_state_size = result->paging_state().size();
97   return CASS_OK;
98 }
99 
100 } // extern "C"
101 
102 class DataTypeDecoder {
103 public:
DataTypeDecoder(Decoder & decoder,SimpleDataTypeCache & cache)104   DataTypeDecoder(Decoder& decoder, SimpleDataTypeCache& cache)
105       : decoder_(decoder)
106       , cache_(cache) {}
107 
decode()108   DataType::ConstPtr decode() {
109     decoder_.set_type("data type");
110     uint16_t value_type;
111     if (!decoder_.decode_uint16(value_type)) return DataType::NIL;
112 
113     switch (value_type) {
114       case CASS_VALUE_TYPE_CUSTOM:
115         return decode_custom();
116 
117       case CASS_VALUE_TYPE_LIST:
118       case CASS_VALUE_TYPE_SET:
119       case CASS_VALUE_TYPE_MAP:
120         return decode_collection(static_cast<CassValueType>(value_type));
121 
122       case CASS_VALUE_TYPE_UDT:
123         return decode_user_type();
124 
125       case CASS_VALUE_TYPE_TUPLE:
126         return decode_tuple();
127 
128       default:
129         return cache_.by_value_type(value_type);
130     }
131   }
132 
133 private:
decode_custom()134   DataType::ConstPtr decode_custom() {
135     StringRef class_name;
136     if (!decoder_.decode_string(&class_name)) return DataType::NIL;
137 
138     DataType::ConstPtr type = cache_.by_class(class_name);
139     if (type) return type;
140 
141     // If no mapping exists, return an actual custom type.
142     return DataType::ConstPtr(new CustomType(class_name.to_string()));
143   }
144 
decode_collection(CassValueType collection_type)145   DataType::ConstPtr decode_collection(CassValueType collection_type) {
146     DataType::Vec types;
147     types.push_back(decode());
148     if (collection_type == CASS_VALUE_TYPE_MAP) {
149       types.push_back(decode());
150     }
151     return DataType::ConstPtr(new CollectionType(collection_type, types, false));
152   }
153 
decode_user_type()154   DataType::ConstPtr decode_user_type() {
155     StringRef keyspace;
156     if (!decoder_.decode_string(&keyspace)) return DataType::NIL;
157 
158     StringRef type_name;
159     if (!decoder_.decode_string(&type_name)) return DataType::NIL;
160 
161     uint16_t n;
162     if (!decoder_.decode_uint16(n)) return DataType::NIL;
163 
164     UserType::FieldVec fields;
165     for (uint16_t i = 0; i < n; ++i) {
166       StringRef field_name;
167       if (!decoder_.decode_string(&field_name)) return DataType::NIL;
168       fields.push_back(UserType::Field(field_name.to_string(), decode()));
169     }
170     return DataType::ConstPtr(
171         new UserType(keyspace.to_string(), type_name.to_string(), fields, false));
172   }
173 
decode_tuple()174   DataType::ConstPtr decode_tuple() {
175     uint16_t n;
176     if (!decoder_.decode_uint16(n)) return DataType::NIL;
177 
178     DataType::Vec types;
179     for (uint16_t i = 0; i < n; ++i) {
180       types.push_back(decode());
181     }
182     return DataType::ConstPtr(new TupleType(types, false));
183   }
184 
185 private:
186   Decoder& decoder_;
187   SimpleDataTypeCache& cache_;
188 };
189 
set_metadata(const ResultMetadata::Ptr & metadata)190 void ResultResponse::set_metadata(const ResultMetadata::Ptr& metadata) {
191   metadata_ = metadata;
192   decode_first_row();
193 }
194 
decode(Decoder & decoder)195 bool ResultResponse::decode(Decoder& decoder) {
196   protocol_version_ = decoder.protocol_version();
197   decoder.set_type("result");
198   bool is_valid = false;
199 
200   CHECK_RESULT(decoder.decode_int32(kind_));
201 
202   switch (kind_) {
203     case CASS_RESULT_KIND_VOID:
204       is_valid = true;
205       break;
206 
207     case CASS_RESULT_KIND_ROWS:
208       is_valid = decode_rows(decoder);
209       break;
210 
211     case CASS_RESULT_KIND_SET_KEYSPACE:
212       is_valid = decode_set_keyspace(decoder);
213       break;
214 
215     case CASS_RESULT_KIND_PREPARED:
216       is_valid = decode_prepared(decoder);
217       break;
218 
219     case CASS_RESULT_KIND_SCHEMA_CHANGE:
220       is_valid = decode_schema_change(decoder);
221       break;
222 
223     default:
224       assert(false);
225       break;
226   }
227 
228   if (!is_valid) decoder.maybe_log_remaining();
229   return is_valid;
230 }
231 
decode_metadata(Decoder & decoder,ResultMetadata::Ptr * metadata,bool has_pk_indices)232 bool ResultResponse::decode_metadata(Decoder& decoder, ResultMetadata::Ptr* metadata,
233                                      bool has_pk_indices) {
234   int32_t flags = 0;
235   CHECK_RESULT(decoder.decode_int32(flags));
236 
237   int32_t column_count = 0;
238   CHECK_RESULT(decoder.decode_int32(column_count));
239 
240   if (flags & CASS_RESULT_FLAG_METADATA_CHANGED) {
241     if (decoder.protocol_version().supports_result_metadata_id()) {
242       CHECK_RESULT(decoder.decode_string(&new_metadata_id_))
243     } else {
244       LOG_ERROR("Metadata changed flag set with invalid protocol version %s",
245                 decoder.protocol_version().to_string().c_str());
246       return false;
247     }
248   }
249 
250   if (has_pk_indices) {
251     int32_t pk_count = 0;
252     CHECK_RESULT(decoder.decode_int32(pk_count));
253     for (int i = 0; i < pk_count; ++i) {
254       uint16_t pk_index = 0;
255       CHECK_RESULT(decoder.decode_uint16(pk_index));
256       pk_indices_.push_back(pk_index);
257     }
258   }
259 
260   if (flags & CASS_RESULT_FLAG_HAS_MORE_PAGES) {
261     has_more_pages_ = true;
262     CHECK_RESULT(decoder.decode_bytes(&paging_state_));
263   } else {
264     has_more_pages_ = false;
265   }
266 
267   if (!(flags & CASS_RESULT_FLAG_NO_METADATA)) {
268     bool global_table_spec = flags & CASS_RESULT_FLAG_GLOBAL_TABLESPEC;
269 
270     if (global_table_spec) {
271       CHECK_RESULT(decoder.decode_string(&keyspace_));
272       CHECK_RESULT(decoder.decode_string(&table_));
273     }
274 
275     metadata->reset(new ResultMetadata(column_count, this->buffer()));
276 
277     SimpleDataTypeCache cache;
278 
279     for (int i = 0; i < column_count; ++i) {
280       ColumnDefinition def;
281 
282       def.index = i;
283 
284       if (!global_table_spec) {
285         CHECK_RESULT(decoder.decode_string(&def.keyspace));
286         CHECK_RESULT(decoder.decode_string(&def.table));
287       }
288 
289       CHECK_RESULT(decoder.decode_string(&def.name));
290 
291       DataTypeDecoder type_decoder(decoder, cache);
292       def.data_type = DataType::ConstPtr(type_decoder.decode());
293       if (def.data_type == DataType::NIL) return false;
294 
295       (*metadata)->add(def);
296     }
297   }
298   return true;
299 }
300 
decode_first_row()301 bool ResultResponse::decode_first_row() {
302   if (row_count_ > 0 && metadata_ && // Valid metadata required for column count
303       first_row_.values.empty()) {   // Only decode the first row once
304     first_row_.values.reserve(column_count());
305     return decode_row(row_decoder_, this, first_row_.values);
306   }
307   return true;
308 }
309 
decode_rows(Decoder & decoder)310 bool ResultResponse::decode_rows(Decoder& decoder) {
311   CHECK_RESULT(decode_metadata(decoder, &metadata_));
312   CHECK_RESULT(decoder.decode_int32(row_count_));
313   row_decoder_ = decoder;
314   CHECK_RESULT(decode_first_row());
315   return true;
316 }
317 
decode_set_keyspace(Decoder & decoder)318 bool ResultResponse::decode_set_keyspace(Decoder& decoder) {
319   CHECK_RESULT(decoder.decode_string(&keyspace_));
320   return true;
321 }
322 
decode_prepared(Decoder & decoder)323 bool ResultResponse::decode_prepared(Decoder& decoder) {
324   CHECK_RESULT(decoder.decode_string(&prepared_id_));
325   if (decoder.protocol_version().supports_result_metadata_id()) {
326     CHECK_RESULT(decoder.decode_string(&result_metadata_id_));
327   }
328   CHECK_RESULT(
329       decode_metadata(decoder, &metadata_, decoder.protocol_version() >= CASS_PROTOCOL_VERSION_V4));
330   CHECK_RESULT(decode_metadata(decoder, &result_metadata_));
331   return true;
332 }
333 
decode_schema_change(Decoder & decoder)334 bool ResultResponse::decode_schema_change(Decoder& decoder) {
335   CHECK_RESULT(decoder.decode_string(&change_));
336   CHECK_RESULT(decoder.decode_string(&keyspace_));
337   CHECK_RESULT(decoder.decode_string(&table_));
338   return true;
339 }
340