1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "result_response.hpp"
18
19 #include "external.hpp"
20 #include "logger.hpp"
21 #include "protocol.hpp"
22 #include "result_metadata.hpp"
23 #include "result_response.hpp"
24 #include "serialization.hpp"
25
26 using namespace datastax;
27 using namespace datastax::internal;
28 using namespace datastax::internal::core;
29
30 extern "C" {
31
cass_result_free(const CassResult * result)32 void cass_result_free(const CassResult* result) { result->dec_ref(); }
33
cass_result_row_count(const CassResult * result)34 size_t cass_result_row_count(const CassResult* result) {
35 if (result->kind() == CASS_RESULT_KIND_ROWS) {
36 return result->row_count();
37 }
38 return 0;
39 }
40
cass_result_column_count(const CassResult * result)41 size_t cass_result_column_count(const CassResult* result) {
42 if (result->kind() == CASS_RESULT_KIND_ROWS) {
43 return result->column_count();
44 }
45 return 0;
46 }
47
cass_result_column_name(const CassResult * result,size_t index,const char ** name,size_t * name_length)48 CassError cass_result_column_name(const CassResult* result, size_t index, const char** name,
49 size_t* name_length) {
50 const SharedRefPtr<ResultMetadata>& metadata(result->metadata());
51 if (index >= metadata->column_count()) {
52 return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
53 }
54 if (result->kind() != CASS_RESULT_KIND_ROWS) {
55 return CASS_ERROR_LIB_BAD_PARAMS;
56 }
57 const ColumnDefinition def = metadata->get_column_definition(index);
58 *name = def.name.data();
59 *name_length = def.name.size();
60 return CASS_OK;
61 }
62
cass_result_column_type(const CassResult * result,size_t index)63 CassValueType cass_result_column_type(const CassResult* result, size_t index) {
64 const SharedRefPtr<ResultMetadata>& metadata(result->metadata());
65 if (result->kind() == CASS_RESULT_KIND_ROWS && index < metadata->column_count()) {
66 return metadata->get_column_definition(index).data_type->value_type();
67 }
68 return CASS_VALUE_TYPE_UNKNOWN;
69 }
70
cass_result_column_data_type(const CassResult * result,size_t index)71 const CassDataType* cass_result_column_data_type(const CassResult* result, size_t index) {
72 const SharedRefPtr<ResultMetadata>& metadata(result->metadata());
73 if (result->kind() == CASS_RESULT_KIND_ROWS && index < metadata->column_count()) {
74 return CassDataType::to(metadata->get_column_definition(index).data_type.get());
75 }
76 return NULL;
77 }
78
cass_result_first_row(const CassResult * result)79 const CassRow* cass_result_first_row(const CassResult* result) {
80 if (result->kind() == CASS_RESULT_KIND_ROWS && result->row_count() > 0) {
81 return CassRow::to(&result->first_row());
82 }
83 return NULL;
84 }
85
cass_result_has_more_pages(const CassResult * result)86 cass_bool_t cass_result_has_more_pages(const CassResult* result) {
87 return static_cast<cass_bool_t>(result->has_more_pages());
88 }
89
cass_result_paging_state_token(const CassResult * result,const char ** paging_state,size_t * paging_state_size)90 CassError cass_result_paging_state_token(const CassResult* result, const char** paging_state,
91 size_t* paging_state_size) {
92 if (!result->has_more_pages()) {
93 return CASS_ERROR_LIB_NO_PAGING_STATE;
94 }
95 *paging_state = result->paging_state().data();
96 *paging_state_size = result->paging_state().size();
97 return CASS_OK;
98 }
99
100 } // extern "C"
101
102 class DataTypeDecoder {
103 public:
DataTypeDecoder(Decoder & decoder,SimpleDataTypeCache & cache)104 DataTypeDecoder(Decoder& decoder, SimpleDataTypeCache& cache)
105 : decoder_(decoder)
106 , cache_(cache) {}
107
decode()108 DataType::ConstPtr decode() {
109 decoder_.set_type("data type");
110 uint16_t value_type;
111 if (!decoder_.decode_uint16(value_type)) return DataType::NIL;
112
113 switch (value_type) {
114 case CASS_VALUE_TYPE_CUSTOM:
115 return decode_custom();
116
117 case CASS_VALUE_TYPE_LIST:
118 case CASS_VALUE_TYPE_SET:
119 case CASS_VALUE_TYPE_MAP:
120 return decode_collection(static_cast<CassValueType>(value_type));
121
122 case CASS_VALUE_TYPE_UDT:
123 return decode_user_type();
124
125 case CASS_VALUE_TYPE_TUPLE:
126 return decode_tuple();
127
128 default:
129 return cache_.by_value_type(value_type);
130 }
131 }
132
133 private:
decode_custom()134 DataType::ConstPtr decode_custom() {
135 StringRef class_name;
136 if (!decoder_.decode_string(&class_name)) return DataType::NIL;
137
138 DataType::ConstPtr type = cache_.by_class(class_name);
139 if (type) return type;
140
141 // If no mapping exists, return an actual custom type.
142 return DataType::ConstPtr(new CustomType(class_name.to_string()));
143 }
144
decode_collection(CassValueType collection_type)145 DataType::ConstPtr decode_collection(CassValueType collection_type) {
146 DataType::Vec types;
147 types.push_back(decode());
148 if (collection_type == CASS_VALUE_TYPE_MAP) {
149 types.push_back(decode());
150 }
151 return DataType::ConstPtr(new CollectionType(collection_type, types, false));
152 }
153
decode_user_type()154 DataType::ConstPtr decode_user_type() {
155 StringRef keyspace;
156 if (!decoder_.decode_string(&keyspace)) return DataType::NIL;
157
158 StringRef type_name;
159 if (!decoder_.decode_string(&type_name)) return DataType::NIL;
160
161 uint16_t n;
162 if (!decoder_.decode_uint16(n)) return DataType::NIL;
163
164 UserType::FieldVec fields;
165 for (uint16_t i = 0; i < n; ++i) {
166 StringRef field_name;
167 if (!decoder_.decode_string(&field_name)) return DataType::NIL;
168 fields.push_back(UserType::Field(field_name.to_string(), decode()));
169 }
170 return DataType::ConstPtr(
171 new UserType(keyspace.to_string(), type_name.to_string(), fields, false));
172 }
173
decode_tuple()174 DataType::ConstPtr decode_tuple() {
175 uint16_t n;
176 if (!decoder_.decode_uint16(n)) return DataType::NIL;
177
178 DataType::Vec types;
179 for (uint16_t i = 0; i < n; ++i) {
180 types.push_back(decode());
181 }
182 return DataType::ConstPtr(new TupleType(types, false));
183 }
184
185 private:
186 Decoder& decoder_;
187 SimpleDataTypeCache& cache_;
188 };
189
set_metadata(const ResultMetadata::Ptr & metadata)190 void ResultResponse::set_metadata(const ResultMetadata::Ptr& metadata) {
191 metadata_ = metadata;
192 decode_first_row();
193 }
194
decode(Decoder & decoder)195 bool ResultResponse::decode(Decoder& decoder) {
196 protocol_version_ = decoder.protocol_version();
197 decoder.set_type("result");
198 bool is_valid = false;
199
200 CHECK_RESULT(decoder.decode_int32(kind_));
201
202 switch (kind_) {
203 case CASS_RESULT_KIND_VOID:
204 is_valid = true;
205 break;
206
207 case CASS_RESULT_KIND_ROWS:
208 is_valid = decode_rows(decoder);
209 break;
210
211 case CASS_RESULT_KIND_SET_KEYSPACE:
212 is_valid = decode_set_keyspace(decoder);
213 break;
214
215 case CASS_RESULT_KIND_PREPARED:
216 is_valid = decode_prepared(decoder);
217 break;
218
219 case CASS_RESULT_KIND_SCHEMA_CHANGE:
220 is_valid = decode_schema_change(decoder);
221 break;
222
223 default:
224 assert(false);
225 break;
226 }
227
228 if (!is_valid) decoder.maybe_log_remaining();
229 return is_valid;
230 }
231
decode_metadata(Decoder & decoder,ResultMetadata::Ptr * metadata,bool has_pk_indices)232 bool ResultResponse::decode_metadata(Decoder& decoder, ResultMetadata::Ptr* metadata,
233 bool has_pk_indices) {
234 int32_t flags = 0;
235 CHECK_RESULT(decoder.decode_int32(flags));
236
237 int32_t column_count = 0;
238 CHECK_RESULT(decoder.decode_int32(column_count));
239
240 if (flags & CASS_RESULT_FLAG_METADATA_CHANGED) {
241 if (decoder.protocol_version().supports_result_metadata_id()) {
242 CHECK_RESULT(decoder.decode_string(&new_metadata_id_))
243 } else {
244 LOG_ERROR("Metadata changed flag set with invalid protocol version %s",
245 decoder.protocol_version().to_string().c_str());
246 return false;
247 }
248 }
249
250 if (has_pk_indices) {
251 int32_t pk_count = 0;
252 CHECK_RESULT(decoder.decode_int32(pk_count));
253 for (int i = 0; i < pk_count; ++i) {
254 uint16_t pk_index = 0;
255 CHECK_RESULT(decoder.decode_uint16(pk_index));
256 pk_indices_.push_back(pk_index);
257 }
258 }
259
260 if (flags & CASS_RESULT_FLAG_HAS_MORE_PAGES) {
261 has_more_pages_ = true;
262 CHECK_RESULT(decoder.decode_bytes(&paging_state_));
263 } else {
264 has_more_pages_ = false;
265 }
266
267 if (!(flags & CASS_RESULT_FLAG_NO_METADATA)) {
268 bool global_table_spec = flags & CASS_RESULT_FLAG_GLOBAL_TABLESPEC;
269
270 if (global_table_spec) {
271 CHECK_RESULT(decoder.decode_string(&keyspace_));
272 CHECK_RESULT(decoder.decode_string(&table_));
273 }
274
275 metadata->reset(new ResultMetadata(column_count, this->buffer()));
276
277 SimpleDataTypeCache cache;
278
279 for (int i = 0; i < column_count; ++i) {
280 ColumnDefinition def;
281
282 def.index = i;
283
284 if (!global_table_spec) {
285 CHECK_RESULT(decoder.decode_string(&def.keyspace));
286 CHECK_RESULT(decoder.decode_string(&def.table));
287 }
288
289 CHECK_RESULT(decoder.decode_string(&def.name));
290
291 DataTypeDecoder type_decoder(decoder, cache);
292 def.data_type = DataType::ConstPtr(type_decoder.decode());
293 if (def.data_type == DataType::NIL) return false;
294
295 (*metadata)->add(def);
296 }
297 }
298 return true;
299 }
300
decode_first_row()301 bool ResultResponse::decode_first_row() {
302 if (row_count_ > 0 && metadata_ && // Valid metadata required for column count
303 first_row_.values.empty()) { // Only decode the first row once
304 first_row_.values.reserve(column_count());
305 return decode_row(row_decoder_, this, first_row_.values);
306 }
307 return true;
308 }
309
decode_rows(Decoder & decoder)310 bool ResultResponse::decode_rows(Decoder& decoder) {
311 CHECK_RESULT(decode_metadata(decoder, &metadata_));
312 CHECK_RESULT(decoder.decode_int32(row_count_));
313 row_decoder_ = decoder;
314 CHECK_RESULT(decode_first_row());
315 return true;
316 }
317
decode_set_keyspace(Decoder & decoder)318 bool ResultResponse::decode_set_keyspace(Decoder& decoder) {
319 CHECK_RESULT(decoder.decode_string(&keyspace_));
320 return true;
321 }
322
decode_prepared(Decoder & decoder)323 bool ResultResponse::decode_prepared(Decoder& decoder) {
324 CHECK_RESULT(decoder.decode_string(&prepared_id_));
325 if (decoder.protocol_version().supports_result_metadata_id()) {
326 CHECK_RESULT(decoder.decode_string(&result_metadata_id_));
327 }
328 CHECK_RESULT(
329 decode_metadata(decoder, &metadata_, decoder.protocol_version() >= CASS_PROTOCOL_VERSION_V4));
330 CHECK_RESULT(decode_metadata(decoder, &result_metadata_));
331 return true;
332 }
333
decode_schema_change(Decoder & decoder)334 bool ResultResponse::decode_schema_change(Decoder& decoder) {
335 CHECK_RESULT(decoder.decode_string(&change_));
336 CHECK_RESULT(decoder.decode_string(&keyspace_));
337 CHECK_RESULT(decoder.decode_string(&table_));
338 return true;
339 }
340