1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #include "statement.hpp"
18 
19 #include "collection.hpp"
20 #include "execute_request.hpp"
21 #include "external.hpp"
22 #include "macros.hpp"
23 #include "prepared.hpp"
24 #include "protocol.hpp"
25 #include "query_request.hpp"
26 #include "request_callback.hpp"
27 #include "scoped_ptr.hpp"
28 #include "string_ref.hpp"
29 #include "tuple.hpp"
30 #include "user_type_value.hpp"
31 
32 #include <uv.h>
33 
34 using namespace datastax;
35 using namespace datastax::internal;
36 using namespace datastax::internal::core;
37 
38 extern "C" {
39 
cass_statement_new(const char * query,size_t parameter_count)40 CassStatement* cass_statement_new(const char* query, size_t parameter_count) {
41   return cass_statement_new_n(query, SAFE_STRLEN(query), parameter_count);
42 }
43 
cass_statement_new_n(const char * query,size_t query_length,size_t parameter_count)44 CassStatement* cass_statement_new_n(const char* query, size_t query_length,
45                                     size_t parameter_count) {
46   QueryRequest* query_request = new QueryRequest(query, query_length, parameter_count);
47   query_request->inc_ref();
48   return CassStatement::to(query_request);
49 }
50 
cass_statement_reset_parameters(CassStatement * statement,size_t count)51 CassError cass_statement_reset_parameters(CassStatement* statement, size_t count) {
52   statement->reset(count);
53   return CASS_OK;
54 }
55 
cass_statement_add_key_index(CassStatement * statement,size_t index)56 CassError cass_statement_add_key_index(CassStatement* statement, size_t index) {
57   if (statement->kind() != CASS_BATCH_KIND_QUERY) return CASS_ERROR_LIB_BAD_PARAMS;
58   if (index >= statement->elements().size()) return CASS_ERROR_LIB_BAD_PARAMS;
59   statement->add_key_index(index);
60   return CASS_OK;
61 }
62 
cass_statement_set_keyspace(CassStatement * statement,const char * keyspace)63 CassError cass_statement_set_keyspace(CassStatement* statement, const char* keyspace) {
64   return cass_statement_set_keyspace_n(statement, keyspace, SAFE_STRLEN(keyspace));
65 }
66 
cass_statement_set_keyspace_n(CassStatement * statement,const char * keyspace,size_t keyspace_length)67 CassError cass_statement_set_keyspace_n(CassStatement* statement, const char* keyspace,
68                                         size_t keyspace_length) {
69   // The keyspace is set by the prepared metadata
70   if (statement->opcode() == CQL_OPCODE_EXECUTE) {
71     return CASS_ERROR_LIB_BAD_PARAMS;
72   }
73   statement->set_keyspace(String(keyspace, keyspace_length));
74   return CASS_OK;
75 }
76 
cass_statement_free(CassStatement * statement)77 void cass_statement_free(CassStatement* statement) { statement->dec_ref(); }
78 
cass_statement_set_consistency(CassStatement * statement,CassConsistency consistency)79 CassError cass_statement_set_consistency(CassStatement* statement, CassConsistency consistency) {
80   statement->set_consistency(consistency);
81   return CASS_OK;
82 }
83 
cass_statement_set_serial_consistency(CassStatement * statement,CassConsistency serial_consistency)84 CassError cass_statement_set_serial_consistency(CassStatement* statement,
85                                                 CassConsistency serial_consistency) {
86   statement->set_serial_consistency(serial_consistency);
87   return CASS_OK;
88 }
89 
cass_statement_set_paging_size(CassStatement * statement,int page_size)90 CassError cass_statement_set_paging_size(CassStatement* statement, int page_size) {
91   statement->set_page_size(page_size);
92   return CASS_OK;
93 }
94 
cass_statement_set_paging_state(CassStatement * statement,const CassResult * result)95 CassError cass_statement_set_paging_state(CassStatement* statement, const CassResult* result) {
96   statement->set_paging_state(result->paging_state().to_string());
97   return CASS_OK;
98 }
99 
cass_statement_set_paging_state_token(CassStatement * statement,const char * paging_state,size_t paging_state_size)100 CassError cass_statement_set_paging_state_token(CassStatement* statement, const char* paging_state,
101                                                 size_t paging_state_size) {
102   statement->set_paging_state(String(paging_state, paging_state_size));
103   return CASS_OK;
104 }
105 
cass_statement_set_retry_policy(CassStatement * statement,CassRetryPolicy * retry_policy)106 CassError cass_statement_set_retry_policy(CassStatement* statement, CassRetryPolicy* retry_policy) {
107   statement->set_retry_policy(retry_policy);
108   return CASS_OK;
109 }
110 
cass_statement_set_timestamp(CassStatement * statement,cass_int64_t timestamp)111 CassError cass_statement_set_timestamp(CassStatement* statement, cass_int64_t timestamp) {
112   statement->set_timestamp(timestamp);
113   return CASS_OK;
114 }
115 
cass_statement_set_request_timeout(CassStatement * statement,cass_uint64_t timeout_ms)116 CassError cass_statement_set_request_timeout(CassStatement* statement, cass_uint64_t timeout_ms) {
117   statement->set_request_timeout_ms(timeout_ms);
118   return CASS_OK;
119 }
120 
cass_statement_set_is_idempotent(CassStatement * statement,cass_bool_t is_idempotent)121 CassError cass_statement_set_is_idempotent(CassStatement* statement, cass_bool_t is_idempotent) {
122   statement->set_is_idempotent(is_idempotent == cass_true);
123   return CASS_OK;
124 }
125 
cass_statement_set_custom_payload(CassStatement * statement,const CassCustomPayload * payload)126 CassError cass_statement_set_custom_payload(CassStatement* statement,
127                                             const CassCustomPayload* payload) {
128   statement->set_custom_payload(payload);
129   return CASS_OK;
130 }
131 
cass_statement_set_execution_profile(CassStatement * statement,const char * name)132 CassError cass_statement_set_execution_profile(CassStatement* statement, const char* name) {
133   return cass_statement_set_execution_profile_n(statement, name, SAFE_STRLEN(name));
134 }
135 
cass_statement_set_execution_profile_n(CassStatement * statement,const char * name,size_t name_length)136 CassError cass_statement_set_execution_profile_n(CassStatement* statement, const char* name,
137                                                  size_t name_length) {
138   if (name_length > 0) {
139     statement->set_execution_profile_name(String(name, name_length));
140   } else {
141     statement->set_execution_profile_name(String());
142   }
143   return CASS_OK;
144 }
145 
cass_statement_set_tracing(CassStatement * statement,cass_bool_t enabled)146 CassError cass_statement_set_tracing(CassStatement* statement, cass_bool_t enabled) {
147   statement->set_tracing(enabled == cass_true);
148   return CASS_OK;
149 }
150 
cass_statement_set_host(CassStatement * statement,const char * host,int port)151 CassError cass_statement_set_host(CassStatement* statement, const char* host, int port) {
152   return cass_statement_set_host_n(statement, host, SAFE_STRLEN(host), port);
153 }
154 
cass_statement_set_host_n(CassStatement * statement,const char * host,size_t host_length,int port)155 CassError cass_statement_set_host_n(CassStatement* statement, const char* host, size_t host_length,
156                                     int port) {
157   Address address(String(host, host_length), port);
158   if (!address.is_valid_and_resolved()) {
159     return CASS_ERROR_LIB_BAD_PARAMS;
160   }
161   statement->set_host(address);
162   return CASS_OK;
163 }
164 
cass_statement_set_host_inet(CassStatement * statement,const CassInet * host,int port)165 CassError cass_statement_set_host_inet(CassStatement* statement, const CassInet* host, int port) {
166   Address address(host->address, host->address_length, port);
167   if (!address.is_valid_and_resolved()) {
168     return CASS_ERROR_LIB_BAD_PARAMS;
169   }
170   statement->set_host(address);
171   return CASS_OK;
172 }
173 
cass_statement_set_node(CassStatement * statement,const CassNode * node)174 CassError cass_statement_set_node(CassStatement* statement, const CassNode* node) {
175   if (node == NULL) {
176     return CASS_ERROR_LIB_BAD_PARAMS;
177   }
178   statement->set_host(*node->from());
179   return CASS_OK;
180 }
181 
182 #define CASS_STATEMENT_BIND(Name, Params, Value)                                               \
183   CassError cass_statement_bind_##Name(CassStatement* statement, size_t index Params) {        \
184     return statement->set(index, Value);                                                       \
185   }                                                                                            \
186   CassError cass_statement_bind_##Name##_by_name(CassStatement* statement,                     \
187                                                  const char* name Params) {                    \
188     return statement->set(StringRef(name), Value);                                             \
189   }                                                                                            \
190   CassError cass_statement_bind_##Name##_by_name_n(CassStatement* statement, const char* name, \
191                                                    size_t name_length Params) {                \
192     return statement->set(StringRef(name, name_length), Value);                                \
193   }
194 
CASS_STATEMENT_BIND(null,ZERO_PARAMS_ (),CassNull ())195 CASS_STATEMENT_BIND(null, ZERO_PARAMS_(), CassNull())
196 CASS_STATEMENT_BIND(int8, ONE_PARAM_(cass_int8_t value), value)
197 CASS_STATEMENT_BIND(int16, ONE_PARAM_(cass_int16_t value), value)
198 CASS_STATEMENT_BIND(int32, ONE_PARAM_(cass_int32_t value), value)
199 CASS_STATEMENT_BIND(uint32, ONE_PARAM_(cass_uint32_t value), value)
200 CASS_STATEMENT_BIND(int64, ONE_PARAM_(cass_int64_t value), value)
201 CASS_STATEMENT_BIND(float, ONE_PARAM_(cass_float_t value), value)
202 CASS_STATEMENT_BIND(double, ONE_PARAM_(cass_double_t value), value)
203 CASS_STATEMENT_BIND(bool, ONE_PARAM_(cass_bool_t value), value)
204 CASS_STATEMENT_BIND(uuid, ONE_PARAM_(CassUuid value), value)
205 CASS_STATEMENT_BIND(inet, ONE_PARAM_(CassInet value), value)
206 CASS_STATEMENT_BIND(collection, ONE_PARAM_(const CassCollection* value), value->from())
207 CASS_STATEMENT_BIND(tuple, ONE_PARAM_(const CassTuple* value), value->from())
208 CASS_STATEMENT_BIND(user_type, ONE_PARAM_(const CassUserType* value), value->from())
209 CASS_STATEMENT_BIND(bytes, TWO_PARAMS_(const cass_byte_t* value, size_t value_size),
210                     CassBytes(value, value_size))
211 CASS_STATEMENT_BIND(decimal,
212                     THREE_PARAMS_(const cass_byte_t* varint, size_t varint_size, int scale),
213                     CassDecimal(varint, varint_size, scale))
214 CASS_STATEMENT_BIND(duration,
215                     THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int64_t nanos),
216                     CassDuration(months, days, nanos))
217 
218 #undef CASS_STATEMENT_BIND
219 
220 CassError cass_statement_bind_string(CassStatement* statement, size_t index, const char* value) {
221   return cass_statement_bind_string_n(statement, index, value, SAFE_STRLEN(value));
222 }
223 
cass_statement_bind_string_n(CassStatement * statement,size_t index,const char * value,size_t value_length)224 CassError cass_statement_bind_string_n(CassStatement* statement, size_t index, const char* value,
225                                        size_t value_length) {
226   return statement->set(index, CassString(value, value_length));
227 }
228 
cass_statement_bind_string_by_name(CassStatement * statement,const char * name,const char * value)229 CassError cass_statement_bind_string_by_name(CassStatement* statement, const char* name,
230                                              const char* value) {
231   return statement->set(StringRef(name), CassString(value, SAFE_STRLEN(value)));
232 }
233 
cass_statement_bind_string_by_name_n(CassStatement * statement,const char * name,size_t name_length,const char * value,size_t value_length)234 CassError cass_statement_bind_string_by_name_n(CassStatement* statement, const char* name,
235                                                size_t name_length, const char* value,
236                                                size_t value_length) {
237   return statement->set(StringRef(name, name_length), CassString(value, value_length));
238 }
239 
cass_statement_bind_custom(CassStatement * statement,size_t index,const char * class_name,const cass_byte_t * value,size_t value_size)240 CassError cass_statement_bind_custom(CassStatement* statement, size_t index, const char* class_name,
241                                      const cass_byte_t* value, size_t value_size) {
242   return statement->set(index, CassCustom(StringRef(class_name), value, value_size));
243 }
244 
cass_statement_bind_custom_n(CassStatement * statement,size_t index,const char * class_name,size_t class_name_length,const cass_byte_t * value,size_t value_size)245 CassError cass_statement_bind_custom_n(CassStatement* statement, size_t index,
246                                        const char* class_name, size_t class_name_length,
247                                        const cass_byte_t* value, size_t value_size) {
248   return statement->set(index,
249                         CassCustom(StringRef(class_name, class_name_length), value, value_size));
250 }
251 
cass_statement_bind_custom_by_name(CassStatement * statement,const char * name,const char * class_name,const cass_byte_t * value,size_t value_size)252 CassError cass_statement_bind_custom_by_name(CassStatement* statement, const char* name,
253                                              const char* class_name, const cass_byte_t* value,
254                                              size_t value_size) {
255   return statement->set(StringRef(name), CassCustom(StringRef(class_name), value, value_size));
256 }
257 
cass_statement_bind_custom_by_name_n(CassStatement * statement,const char * name,size_t name_length,const char * class_name,size_t class_name_length,const cass_byte_t * value,size_t value_size)258 CassError cass_statement_bind_custom_by_name_n(CassStatement* statement, const char* name,
259                                                size_t name_length, const char* class_name,
260                                                size_t class_name_length, const cass_byte_t* value,
261                                                size_t value_size) {
262   return statement->set(StringRef(name, name_length),
263                         CassCustom(StringRef(class_name, class_name_length), value, value_size));
264 }
265 
266 } // extern "C"
267 
Statement(const char * query,size_t query_length,size_t values_count)268 Statement::Statement(const char* query, size_t query_length, size_t values_count)
269     : RoutableRequest(CQL_OPCODE_QUERY)
270     , AbstractData(values_count)
271     , query_or_id_(sizeof(int32_t) + query_length)
272     , flags_(0)
273     , page_size_(-1) {
274   // <query> [long string]
275   query_or_id_.encode_long_string(0, query, query_length);
276 }
277 
Statement(const Prepared * prepared)278 Statement::Statement(const Prepared* prepared)
279     : RoutableRequest(CQL_OPCODE_EXECUTE)
280     , AbstractData(prepared->result()->column_count())
281     , query_or_id_(sizeof(uint16_t) + prepared->id().size())
282     , flags_(0)
283     , page_size_(-1) {
284   // <id> [short bytes] (or [string])
285   const String& id = prepared->id();
286   query_or_id_.encode_string(0, id.data(), static_cast<uint16_t>(id.size()));
287   // Inherit settings and keyspace from the prepared statement
288   set_settings(prepared->request_settings());
289   // If the keyspace wasn't explictly set then attempt to set it using the
290   // prepared statement's result metadata.
291   if (keyspace().empty()) {
292     set_keyspace(prepared->result()->quoted_keyspace());
293   }
294 }
295 
query() const296 String Statement::query() const {
297   if (opcode() == CQL_OPCODE_QUERY) {
298     return String(query_or_id_.data() + sizeof(int32_t), query_or_id_.size() - sizeof(int32_t));
299   }
300   return String();
301 }
302 
303 // Format: <kind><string_or_id><n><value_1>...<value_n>
304 // where:
305 // <kind> is a [byte]
306 // <string_or_id> is a [long string] for <string> and a [short bytes] for <id>
307 // <n> is a [short]
308 // <value> is a [bytes]
encode_batch(ProtocolVersion version,RequestCallback * callback,BufferVec * bufs) const309 int32_t Statement::encode_batch(ProtocolVersion version, RequestCallback* callback,
310                                 BufferVec* bufs) const {
311   int32_t length = 0;
312 
313   { // <kind> [byte]
314     bufs->push_back(Buffer(sizeof(uint8_t)));
315     Buffer& buf = bufs->back();
316     buf.encode_byte(0, kind());
317     length += sizeof(uint8_t);
318   }
319 
320   bufs->push_back(query_or_id_);
321   length += query_or_id_.size();
322 
323   { // <n> [short]
324     bufs->push_back(Buffer(sizeof(uint16_t)));
325     Buffer& buf = bufs->back();
326     buf.encode_uint16(0, static_cast<uint16_t>(elements().size()));
327     length += sizeof(uint16_t);
328   }
329 
330   if (elements().size() > 0) {
331     int32_t result = encode_values(version, callback, bufs);
332     if (result < 0) return result;
333     length += result;
334   }
335 
336   return length;
337 }
338 
with_keyspace(ProtocolVersion version) const339 bool Statement::with_keyspace(ProtocolVersion version) const {
340   return version.supports_set_keyspace() &&
341          // Execute requests (bound statements) use the keyspace
342          // from the time of prepare.
343          opcode() != CQL_OPCODE_EXECUTE && !keyspace().empty();
344 }
345 
346 // For query statements the format is:
347 // <query><consistency><flags><n>
348 // where:
349 // <query> has the format [long string]
350 // <consistency> is a [short]
351 // <flags> is a [byte] (or [int] for protocol v5)
352 // <n> is a [short]
353 //
354 // For execute statements the format is:
355 // <id><consistency><flags><n>
356 // where:
357 // <id> has the format [short bytes] (or [string])
358 // <consistency> is a [short]
359 // <flags> is a [byte] (or [int] for protocol v5)
360 // <n> is a [short]
361 
encode_query_or_id(BufferVec * bufs) const362 int32_t Statement::encode_query_or_id(BufferVec* bufs) const {
363   bufs->push_back(query_or_id_);
364   return query_or_id_.size();
365 }
366 
encode_begin(ProtocolVersion version,uint16_t element_count,RequestCallback * callback,BufferVec * bufs) const367 int32_t Statement::encode_begin(ProtocolVersion version, uint16_t element_count,
368                                 RequestCallback* callback, BufferVec* bufs) const {
369   int32_t length = 0;
370   size_t query_params_buf_size = 0;
371   int32_t flags = flags_;
372 
373   if (callback->skip_metadata()) {
374     flags |= CASS_QUERY_FLAG_SKIP_METADATA;
375   }
376 
377   query_params_buf_size += sizeof(uint16_t); // <consistency> [short]
378 
379   if (version >= CASS_PROTOCOL_VERSION_V5) {
380     query_params_buf_size += sizeof(int32_t); // <flags> [int]
381   } else {
382     query_params_buf_size += sizeof(uint8_t); // <flags> [byte]
383   }
384 
385   if (element_count > 0) {
386     query_params_buf_size += sizeof(uint16_t); // <n> [short]
387     flags |= CASS_QUERY_FLAG_VALUES;
388   }
389 
390   if (page_size() > 0) {
391     flags |= CASS_QUERY_FLAG_PAGE_SIZE;
392   }
393 
394   if (!paging_state().empty()) {
395     flags |= CASS_QUERY_FLAG_PAGING_STATE;
396   }
397 
398   if (callback->serial_consistency() != 0) {
399     flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY;
400   }
401 
402   if (callback->timestamp() != CASS_INT64_MIN) {
403     flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP;
404   }
405 
406   if (with_keyspace(version)) {
407     flags |= CASS_QUERY_FLAG_WITH_KEYSPACE;
408   }
409 
410   bufs->push_back(Buffer(query_params_buf_size));
411   length += query_params_buf_size;
412 
413   Buffer& buf = bufs->back();
414   size_t pos = buf.encode_uint16(0, callback->consistency());
415 
416   if (version >= CASS_PROTOCOL_VERSION_V5) {
417     pos = buf.encode_int32(pos, flags);
418   } else {
419     pos = buf.encode_byte(pos, static_cast<uint8_t>(flags));
420   }
421 
422   if (element_count > 0) {
423     buf.encode_uint16(pos, element_count);
424   }
425 
426   return length;
427 }
428 
429 // Format: [<value_1>...<value_n>]
430 // where:
431 // <value> is a [bytes]
encode_values(ProtocolVersion version,RequestCallback * callback,BufferVec * bufs) const432 int32_t Statement::encode_values(ProtocolVersion version, RequestCallback* callback,
433                                  BufferVec* bufs) const {
434   int32_t length = 0;
435   for (size_t i = 0; i < elements().size(); ++i) {
436     const Element& element = elements()[i];
437     if (!element.is_unset()) {
438       bufs->push_back(element.get_buffer());
439     } else {
440       if (version >= CASS_PROTOCOL_VERSION_V4) {
441         bufs->push_back(core::encode_with_length(CassUnset()));
442       } else {
443         OStringStream ss;
444         ss << "Query parameter at index " << i << " was not set";
445         callback->on_error(CASS_ERROR_LIB_PARAMETER_UNSET, ss.str());
446         return Request::REQUEST_ERROR_PARAMETER_UNSET;
447       }
448     }
449     length += bufs->back().size();
450   }
451   return length;
452 }
453 
454 // Format: [<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
455 // where:
456 // <result_page_size> is a [int]
457 // <paging_state> is a [bytes]
458 // <serial_consistency> is a [short]
459 // <timestamp> is a [long]
460 // <keyspace> is a [string]
encode_end(ProtocolVersion version,RequestCallback * callback,BufferVec * bufs) const461 int32_t Statement::encode_end(ProtocolVersion version, RequestCallback* callback,
462                               BufferVec* bufs) const {
463   int32_t length = 0;
464   size_t paging_buf_size = 0;
465 
466   bool with_keyspace = this->with_keyspace(version);
467 
468   if (page_size() > 0) {
469     paging_buf_size += sizeof(int32_t); // [int]
470   }
471 
472   if (!paging_state().empty()) {
473     paging_buf_size += sizeof(int32_t) + paging_state().size(); // [bytes]
474   }
475 
476   if (callback->serial_consistency() != 0) {
477     paging_buf_size += sizeof(uint16_t); // [short]
478   }
479 
480   if (callback->timestamp() != CASS_INT64_MIN) {
481     paging_buf_size += sizeof(int64_t); // [long]
482   }
483 
484   if (with_keyspace) {
485     paging_buf_size += sizeof(uint16_t) + keyspace().size();
486   }
487 
488   if (paging_buf_size > 0) {
489     bufs->push_back(Buffer(paging_buf_size));
490     length += paging_buf_size;
491 
492     Buffer& buf = bufs->back();
493     size_t pos = 0;
494 
495     if (page_size() >= 0) {
496       pos = buf.encode_int32(pos, page_size());
497     }
498 
499     if (!paging_state().empty()) {
500       pos = buf.encode_bytes(pos, paging_state().data(), paging_state().size());
501     }
502 
503     if (callback->serial_consistency() != 0) {
504       pos = buf.encode_uint16(pos, callback->serial_consistency());
505     }
506 
507     if (callback->timestamp() != CASS_INT64_MIN) {
508       pos = buf.encode_int64(pos, callback->timestamp());
509     }
510 
511     if (with_keyspace) {
512       pos = buf.encode_string(pos, keyspace().data(), static_cast<uint16_t>(keyspace().size()));
513     }
514   }
515 
516   return length;
517 }
518 
calculate_routing_key(const Vector<size_t> & key_indices,String * routing_key) const519 bool Statement::calculate_routing_key(const Vector<size_t>& key_indices,
520                                       String* routing_key) const {
521   if (key_indices.empty()) return false;
522 
523   if (key_indices.size() == 1) {
524     assert(key_indices.front() < elements().size());
525     const AbstractData::Element& element(elements()[key_indices.front()]);
526     if (element.is_unset() || element.is_null()) {
527       return false;
528     }
529     Buffer buf(element.get_buffer());
530     routing_key->assign(buf.data() + sizeof(int32_t), buf.size() - sizeof(int32_t));
531   } else {
532     size_t length = 0;
533 
534     for (Vector<size_t>::const_iterator i = key_indices.begin(); i != key_indices.end(); ++i) {
535       assert(*i < elements().size());
536       const AbstractData::Element& element(elements()[*i]);
537       if (element.is_unset() || element.is_null()) {
538         return false;
539       }
540       size_t size = element.get_size() - sizeof(int32_t);
541       length += sizeof(uint16_t) + size + 1;
542     }
543 
544     routing_key->clear();
545     routing_key->reserve(length);
546 
547     for (Vector<size_t>::const_iterator i = key_indices.begin(); i != key_indices.end(); ++i) {
548       const AbstractData::Element& element(elements()[*i]);
549       Buffer buf(element.get_buffer());
550       size_t size = buf.size() - sizeof(int32_t);
551 
552       char size_buf[sizeof(uint16_t)];
553       encode_uint16(size_buf, static_cast<uint16_t>(size));
554       routing_key->append(size_buf, sizeof(uint16_t));
555       routing_key->append(buf.data() + sizeof(int32_t), size);
556       routing_key->push_back(0);
557     }
558   }
559 
560   return true;
561 }
562