1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "statement.hpp"
18
19 #include "collection.hpp"
20 #include "execute_request.hpp"
21 #include "external.hpp"
22 #include "macros.hpp"
23 #include "prepared.hpp"
24 #include "protocol.hpp"
25 #include "query_request.hpp"
26 #include "request_callback.hpp"
27 #include "scoped_ptr.hpp"
28 #include "string_ref.hpp"
29 #include "tuple.hpp"
30 #include "user_type_value.hpp"
31
32 #include <uv.h>
33
34 using namespace datastax;
35 using namespace datastax::internal;
36 using namespace datastax::internal::core;
37
38 extern "C" {
39
cass_statement_new(const char * query,size_t parameter_count)40 CassStatement* cass_statement_new(const char* query, size_t parameter_count) {
41 return cass_statement_new_n(query, SAFE_STRLEN(query), parameter_count);
42 }
43
cass_statement_new_n(const char * query,size_t query_length,size_t parameter_count)44 CassStatement* cass_statement_new_n(const char* query, size_t query_length,
45 size_t parameter_count) {
46 QueryRequest* query_request = new QueryRequest(query, query_length, parameter_count);
47 query_request->inc_ref();
48 return CassStatement::to(query_request);
49 }
50
cass_statement_reset_parameters(CassStatement * statement,size_t count)51 CassError cass_statement_reset_parameters(CassStatement* statement, size_t count) {
52 statement->reset(count);
53 return CASS_OK;
54 }
55
cass_statement_add_key_index(CassStatement * statement,size_t index)56 CassError cass_statement_add_key_index(CassStatement* statement, size_t index) {
57 if (statement->kind() != CASS_BATCH_KIND_QUERY) return CASS_ERROR_LIB_BAD_PARAMS;
58 if (index >= statement->elements().size()) return CASS_ERROR_LIB_BAD_PARAMS;
59 statement->add_key_index(index);
60 return CASS_OK;
61 }
62
cass_statement_set_keyspace(CassStatement * statement,const char * keyspace)63 CassError cass_statement_set_keyspace(CassStatement* statement, const char* keyspace) {
64 return cass_statement_set_keyspace_n(statement, keyspace, SAFE_STRLEN(keyspace));
65 }
66
cass_statement_set_keyspace_n(CassStatement * statement,const char * keyspace,size_t keyspace_length)67 CassError cass_statement_set_keyspace_n(CassStatement* statement, const char* keyspace,
68 size_t keyspace_length) {
69 // The keyspace is set by the prepared metadata
70 if (statement->opcode() == CQL_OPCODE_EXECUTE) {
71 return CASS_ERROR_LIB_BAD_PARAMS;
72 }
73 statement->set_keyspace(String(keyspace, keyspace_length));
74 return CASS_OK;
75 }
76
cass_statement_free(CassStatement * statement)77 void cass_statement_free(CassStatement* statement) { statement->dec_ref(); }
78
cass_statement_set_consistency(CassStatement * statement,CassConsistency consistency)79 CassError cass_statement_set_consistency(CassStatement* statement, CassConsistency consistency) {
80 statement->set_consistency(consistency);
81 return CASS_OK;
82 }
83
cass_statement_set_serial_consistency(CassStatement * statement,CassConsistency serial_consistency)84 CassError cass_statement_set_serial_consistency(CassStatement* statement,
85 CassConsistency serial_consistency) {
86 statement->set_serial_consistency(serial_consistency);
87 return CASS_OK;
88 }
89
cass_statement_set_paging_size(CassStatement * statement,int page_size)90 CassError cass_statement_set_paging_size(CassStatement* statement, int page_size) {
91 statement->set_page_size(page_size);
92 return CASS_OK;
93 }
94
cass_statement_set_paging_state(CassStatement * statement,const CassResult * result)95 CassError cass_statement_set_paging_state(CassStatement* statement, const CassResult* result) {
96 statement->set_paging_state(result->paging_state().to_string());
97 return CASS_OK;
98 }
99
cass_statement_set_paging_state_token(CassStatement * statement,const char * paging_state,size_t paging_state_size)100 CassError cass_statement_set_paging_state_token(CassStatement* statement, const char* paging_state,
101 size_t paging_state_size) {
102 statement->set_paging_state(String(paging_state, paging_state_size));
103 return CASS_OK;
104 }
105
cass_statement_set_retry_policy(CassStatement * statement,CassRetryPolicy * retry_policy)106 CassError cass_statement_set_retry_policy(CassStatement* statement, CassRetryPolicy* retry_policy) {
107 statement->set_retry_policy(retry_policy);
108 return CASS_OK;
109 }
110
cass_statement_set_timestamp(CassStatement * statement,cass_int64_t timestamp)111 CassError cass_statement_set_timestamp(CassStatement* statement, cass_int64_t timestamp) {
112 statement->set_timestamp(timestamp);
113 return CASS_OK;
114 }
115
cass_statement_set_request_timeout(CassStatement * statement,cass_uint64_t timeout_ms)116 CassError cass_statement_set_request_timeout(CassStatement* statement, cass_uint64_t timeout_ms) {
117 statement->set_request_timeout_ms(timeout_ms);
118 return CASS_OK;
119 }
120
cass_statement_set_is_idempotent(CassStatement * statement,cass_bool_t is_idempotent)121 CassError cass_statement_set_is_idempotent(CassStatement* statement, cass_bool_t is_idempotent) {
122 statement->set_is_idempotent(is_idempotent == cass_true);
123 return CASS_OK;
124 }
125
cass_statement_set_custom_payload(CassStatement * statement,const CassCustomPayload * payload)126 CassError cass_statement_set_custom_payload(CassStatement* statement,
127 const CassCustomPayload* payload) {
128 statement->set_custom_payload(payload);
129 return CASS_OK;
130 }
131
cass_statement_set_execution_profile(CassStatement * statement,const char * name)132 CassError cass_statement_set_execution_profile(CassStatement* statement, const char* name) {
133 return cass_statement_set_execution_profile_n(statement, name, SAFE_STRLEN(name));
134 }
135
cass_statement_set_execution_profile_n(CassStatement * statement,const char * name,size_t name_length)136 CassError cass_statement_set_execution_profile_n(CassStatement* statement, const char* name,
137 size_t name_length) {
138 if (name_length > 0) {
139 statement->set_execution_profile_name(String(name, name_length));
140 } else {
141 statement->set_execution_profile_name(String());
142 }
143 return CASS_OK;
144 }
145
cass_statement_set_tracing(CassStatement * statement,cass_bool_t enabled)146 CassError cass_statement_set_tracing(CassStatement* statement, cass_bool_t enabled) {
147 statement->set_tracing(enabled == cass_true);
148 return CASS_OK;
149 }
150
cass_statement_set_host(CassStatement * statement,const char * host,int port)151 CassError cass_statement_set_host(CassStatement* statement, const char* host, int port) {
152 return cass_statement_set_host_n(statement, host, SAFE_STRLEN(host), port);
153 }
154
cass_statement_set_host_n(CassStatement * statement,const char * host,size_t host_length,int port)155 CassError cass_statement_set_host_n(CassStatement* statement, const char* host, size_t host_length,
156 int port) {
157 Address address(String(host, host_length), port);
158 if (!address.is_valid_and_resolved()) {
159 return CASS_ERROR_LIB_BAD_PARAMS;
160 }
161 statement->set_host(address);
162 return CASS_OK;
163 }
164
cass_statement_set_host_inet(CassStatement * statement,const CassInet * host,int port)165 CassError cass_statement_set_host_inet(CassStatement* statement, const CassInet* host, int port) {
166 Address address(host->address, host->address_length, port);
167 if (!address.is_valid_and_resolved()) {
168 return CASS_ERROR_LIB_BAD_PARAMS;
169 }
170 statement->set_host(address);
171 return CASS_OK;
172 }
173
cass_statement_set_node(CassStatement * statement,const CassNode * node)174 CassError cass_statement_set_node(CassStatement* statement, const CassNode* node) {
175 if (node == NULL) {
176 return CASS_ERROR_LIB_BAD_PARAMS;
177 }
178 statement->set_host(*node->from());
179 return CASS_OK;
180 }
181
182 #define CASS_STATEMENT_BIND(Name, Params, Value) \
183 CassError cass_statement_bind_##Name(CassStatement* statement, size_t index Params) { \
184 return statement->set(index, Value); \
185 } \
186 CassError cass_statement_bind_##Name##_by_name(CassStatement* statement, \
187 const char* name Params) { \
188 return statement->set(StringRef(name), Value); \
189 } \
190 CassError cass_statement_bind_##Name##_by_name_n(CassStatement* statement, const char* name, \
191 size_t name_length Params) { \
192 return statement->set(StringRef(name, name_length), Value); \
193 }
194
CASS_STATEMENT_BIND(null,ZERO_PARAMS_ (),CassNull ())195 CASS_STATEMENT_BIND(null, ZERO_PARAMS_(), CassNull())
196 CASS_STATEMENT_BIND(int8, ONE_PARAM_(cass_int8_t value), value)
197 CASS_STATEMENT_BIND(int16, ONE_PARAM_(cass_int16_t value), value)
198 CASS_STATEMENT_BIND(int32, ONE_PARAM_(cass_int32_t value), value)
199 CASS_STATEMENT_BIND(uint32, ONE_PARAM_(cass_uint32_t value), value)
200 CASS_STATEMENT_BIND(int64, ONE_PARAM_(cass_int64_t value), value)
201 CASS_STATEMENT_BIND(float, ONE_PARAM_(cass_float_t value), value)
202 CASS_STATEMENT_BIND(double, ONE_PARAM_(cass_double_t value), value)
203 CASS_STATEMENT_BIND(bool, ONE_PARAM_(cass_bool_t value), value)
204 CASS_STATEMENT_BIND(uuid, ONE_PARAM_(CassUuid value), value)
205 CASS_STATEMENT_BIND(inet, ONE_PARAM_(CassInet value), value)
206 CASS_STATEMENT_BIND(collection, ONE_PARAM_(const CassCollection* value), value->from())
207 CASS_STATEMENT_BIND(tuple, ONE_PARAM_(const CassTuple* value), value->from())
208 CASS_STATEMENT_BIND(user_type, ONE_PARAM_(const CassUserType* value), value->from())
209 CASS_STATEMENT_BIND(bytes, TWO_PARAMS_(const cass_byte_t* value, size_t value_size),
210 CassBytes(value, value_size))
211 CASS_STATEMENT_BIND(decimal,
212 THREE_PARAMS_(const cass_byte_t* varint, size_t varint_size, int scale),
213 CassDecimal(varint, varint_size, scale))
214 CASS_STATEMENT_BIND(duration,
215 THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int64_t nanos),
216 CassDuration(months, days, nanos))
217
218 #undef CASS_STATEMENT_BIND
219
220 CassError cass_statement_bind_string(CassStatement* statement, size_t index, const char* value) {
221 return cass_statement_bind_string_n(statement, index, value, SAFE_STRLEN(value));
222 }
223
cass_statement_bind_string_n(CassStatement * statement,size_t index,const char * value,size_t value_length)224 CassError cass_statement_bind_string_n(CassStatement* statement, size_t index, const char* value,
225 size_t value_length) {
226 return statement->set(index, CassString(value, value_length));
227 }
228
cass_statement_bind_string_by_name(CassStatement * statement,const char * name,const char * value)229 CassError cass_statement_bind_string_by_name(CassStatement* statement, const char* name,
230 const char* value) {
231 return statement->set(StringRef(name), CassString(value, SAFE_STRLEN(value)));
232 }
233
cass_statement_bind_string_by_name_n(CassStatement * statement,const char * name,size_t name_length,const char * value,size_t value_length)234 CassError cass_statement_bind_string_by_name_n(CassStatement* statement, const char* name,
235 size_t name_length, const char* value,
236 size_t value_length) {
237 return statement->set(StringRef(name, name_length), CassString(value, value_length));
238 }
239
cass_statement_bind_custom(CassStatement * statement,size_t index,const char * class_name,const cass_byte_t * value,size_t value_size)240 CassError cass_statement_bind_custom(CassStatement* statement, size_t index, const char* class_name,
241 const cass_byte_t* value, size_t value_size) {
242 return statement->set(index, CassCustom(StringRef(class_name), value, value_size));
243 }
244
cass_statement_bind_custom_n(CassStatement * statement,size_t index,const char * class_name,size_t class_name_length,const cass_byte_t * value,size_t value_size)245 CassError cass_statement_bind_custom_n(CassStatement* statement, size_t index,
246 const char* class_name, size_t class_name_length,
247 const cass_byte_t* value, size_t value_size) {
248 return statement->set(index,
249 CassCustom(StringRef(class_name, class_name_length), value, value_size));
250 }
251
cass_statement_bind_custom_by_name(CassStatement * statement,const char * name,const char * class_name,const cass_byte_t * value,size_t value_size)252 CassError cass_statement_bind_custom_by_name(CassStatement* statement, const char* name,
253 const char* class_name, const cass_byte_t* value,
254 size_t value_size) {
255 return statement->set(StringRef(name), CassCustom(StringRef(class_name), value, value_size));
256 }
257
cass_statement_bind_custom_by_name_n(CassStatement * statement,const char * name,size_t name_length,const char * class_name,size_t class_name_length,const cass_byte_t * value,size_t value_size)258 CassError cass_statement_bind_custom_by_name_n(CassStatement* statement, const char* name,
259 size_t name_length, const char* class_name,
260 size_t class_name_length, const cass_byte_t* value,
261 size_t value_size) {
262 return statement->set(StringRef(name, name_length),
263 CassCustom(StringRef(class_name, class_name_length), value, value_size));
264 }
265
266 } // extern "C"
267
Statement(const char * query,size_t query_length,size_t values_count)268 Statement::Statement(const char* query, size_t query_length, size_t values_count)
269 : RoutableRequest(CQL_OPCODE_QUERY)
270 , AbstractData(values_count)
271 , query_or_id_(sizeof(int32_t) + query_length)
272 , flags_(0)
273 , page_size_(-1) {
274 // <query> [long string]
275 query_or_id_.encode_long_string(0, query, query_length);
276 }
277
Statement(const Prepared * prepared)278 Statement::Statement(const Prepared* prepared)
279 : RoutableRequest(CQL_OPCODE_EXECUTE)
280 , AbstractData(prepared->result()->column_count())
281 , query_or_id_(sizeof(uint16_t) + prepared->id().size())
282 , flags_(0)
283 , page_size_(-1) {
284 // <id> [short bytes] (or [string])
285 const String& id = prepared->id();
286 query_or_id_.encode_string(0, id.data(), static_cast<uint16_t>(id.size()));
287 // Inherit settings and keyspace from the prepared statement
288 set_settings(prepared->request_settings());
289 // If the keyspace wasn't explictly set then attempt to set it using the
290 // prepared statement's result metadata.
291 if (keyspace().empty()) {
292 set_keyspace(prepared->result()->quoted_keyspace());
293 }
294 }
295
query() const296 String Statement::query() const {
297 if (opcode() == CQL_OPCODE_QUERY) {
298 return String(query_or_id_.data() + sizeof(int32_t), query_or_id_.size() - sizeof(int32_t));
299 }
300 return String();
301 }
302
303 // Format: <kind><string_or_id><n><value_1>...<value_n>
304 // where:
305 // <kind> is a [byte]
306 // <string_or_id> is a [long string] for <string> and a [short bytes] for <id>
307 // <n> is a [short]
308 // <value> is a [bytes]
encode_batch(ProtocolVersion version,RequestCallback * callback,BufferVec * bufs) const309 int32_t Statement::encode_batch(ProtocolVersion version, RequestCallback* callback,
310 BufferVec* bufs) const {
311 int32_t length = 0;
312
313 { // <kind> [byte]
314 bufs->push_back(Buffer(sizeof(uint8_t)));
315 Buffer& buf = bufs->back();
316 buf.encode_byte(0, kind());
317 length += sizeof(uint8_t);
318 }
319
320 bufs->push_back(query_or_id_);
321 length += query_or_id_.size();
322
323 { // <n> [short]
324 bufs->push_back(Buffer(sizeof(uint16_t)));
325 Buffer& buf = bufs->back();
326 buf.encode_uint16(0, static_cast<uint16_t>(elements().size()));
327 length += sizeof(uint16_t);
328 }
329
330 if (elements().size() > 0) {
331 int32_t result = encode_values(version, callback, bufs);
332 if (result < 0) return result;
333 length += result;
334 }
335
336 return length;
337 }
338
with_keyspace(ProtocolVersion version) const339 bool Statement::with_keyspace(ProtocolVersion version) const {
340 return version.supports_set_keyspace() &&
341 // Execute requests (bound statements) use the keyspace
342 // from the time of prepare.
343 opcode() != CQL_OPCODE_EXECUTE && !keyspace().empty();
344 }
345
346 // For query statements the format is:
347 // <query><consistency><flags><n>
348 // where:
349 // <query> has the format [long string]
350 // <consistency> is a [short]
351 // <flags> is a [byte] (or [int] for protocol v5)
352 // <n> is a [short]
353 //
354 // For execute statements the format is:
355 // <id><consistency><flags><n>
356 // where:
357 // <id> has the format [short bytes] (or [string])
358 // <consistency> is a [short]
359 // <flags> is a [byte] (or [int] for protocol v5)
360 // <n> is a [short]
361
encode_query_or_id(BufferVec * bufs) const362 int32_t Statement::encode_query_or_id(BufferVec* bufs) const {
363 bufs->push_back(query_or_id_);
364 return query_or_id_.size();
365 }
366
encode_begin(ProtocolVersion version,uint16_t element_count,RequestCallback * callback,BufferVec * bufs) const367 int32_t Statement::encode_begin(ProtocolVersion version, uint16_t element_count,
368 RequestCallback* callback, BufferVec* bufs) const {
369 int32_t length = 0;
370 size_t query_params_buf_size = 0;
371 int32_t flags = flags_;
372
373 if (callback->skip_metadata()) {
374 flags |= CASS_QUERY_FLAG_SKIP_METADATA;
375 }
376
377 query_params_buf_size += sizeof(uint16_t); // <consistency> [short]
378
379 if (version >= CASS_PROTOCOL_VERSION_V5) {
380 query_params_buf_size += sizeof(int32_t); // <flags> [int]
381 } else {
382 query_params_buf_size += sizeof(uint8_t); // <flags> [byte]
383 }
384
385 if (element_count > 0) {
386 query_params_buf_size += sizeof(uint16_t); // <n> [short]
387 flags |= CASS_QUERY_FLAG_VALUES;
388 }
389
390 if (page_size() > 0) {
391 flags |= CASS_QUERY_FLAG_PAGE_SIZE;
392 }
393
394 if (!paging_state().empty()) {
395 flags |= CASS_QUERY_FLAG_PAGING_STATE;
396 }
397
398 if (callback->serial_consistency() != 0) {
399 flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY;
400 }
401
402 if (callback->timestamp() != CASS_INT64_MIN) {
403 flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP;
404 }
405
406 if (with_keyspace(version)) {
407 flags |= CASS_QUERY_FLAG_WITH_KEYSPACE;
408 }
409
410 bufs->push_back(Buffer(query_params_buf_size));
411 length += query_params_buf_size;
412
413 Buffer& buf = bufs->back();
414 size_t pos = buf.encode_uint16(0, callback->consistency());
415
416 if (version >= CASS_PROTOCOL_VERSION_V5) {
417 pos = buf.encode_int32(pos, flags);
418 } else {
419 pos = buf.encode_byte(pos, static_cast<uint8_t>(flags));
420 }
421
422 if (element_count > 0) {
423 buf.encode_uint16(pos, element_count);
424 }
425
426 return length;
427 }
428
429 // Format: [<value_1>...<value_n>]
430 // where:
431 // <value> is a [bytes]
encode_values(ProtocolVersion version,RequestCallback * callback,BufferVec * bufs) const432 int32_t Statement::encode_values(ProtocolVersion version, RequestCallback* callback,
433 BufferVec* bufs) const {
434 int32_t length = 0;
435 for (size_t i = 0; i < elements().size(); ++i) {
436 const Element& element = elements()[i];
437 if (!element.is_unset()) {
438 bufs->push_back(element.get_buffer());
439 } else {
440 if (version >= CASS_PROTOCOL_VERSION_V4) {
441 bufs->push_back(core::encode_with_length(CassUnset()));
442 } else {
443 OStringStream ss;
444 ss << "Query parameter at index " << i << " was not set";
445 callback->on_error(CASS_ERROR_LIB_PARAMETER_UNSET, ss.str());
446 return Request::REQUEST_ERROR_PARAMETER_UNSET;
447 }
448 }
449 length += bufs->back().size();
450 }
451 return length;
452 }
453
454 // Format: [<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
455 // where:
456 // <result_page_size> is a [int]
457 // <paging_state> is a [bytes]
458 // <serial_consistency> is a [short]
459 // <timestamp> is a [long]
460 // <keyspace> is a [string]
encode_end(ProtocolVersion version,RequestCallback * callback,BufferVec * bufs) const461 int32_t Statement::encode_end(ProtocolVersion version, RequestCallback* callback,
462 BufferVec* bufs) const {
463 int32_t length = 0;
464 size_t paging_buf_size = 0;
465
466 bool with_keyspace = this->with_keyspace(version);
467
468 if (page_size() > 0) {
469 paging_buf_size += sizeof(int32_t); // [int]
470 }
471
472 if (!paging_state().empty()) {
473 paging_buf_size += sizeof(int32_t) + paging_state().size(); // [bytes]
474 }
475
476 if (callback->serial_consistency() != 0) {
477 paging_buf_size += sizeof(uint16_t); // [short]
478 }
479
480 if (callback->timestamp() != CASS_INT64_MIN) {
481 paging_buf_size += sizeof(int64_t); // [long]
482 }
483
484 if (with_keyspace) {
485 paging_buf_size += sizeof(uint16_t) + keyspace().size();
486 }
487
488 if (paging_buf_size > 0) {
489 bufs->push_back(Buffer(paging_buf_size));
490 length += paging_buf_size;
491
492 Buffer& buf = bufs->back();
493 size_t pos = 0;
494
495 if (page_size() >= 0) {
496 pos = buf.encode_int32(pos, page_size());
497 }
498
499 if (!paging_state().empty()) {
500 pos = buf.encode_bytes(pos, paging_state().data(), paging_state().size());
501 }
502
503 if (callback->serial_consistency() != 0) {
504 pos = buf.encode_uint16(pos, callback->serial_consistency());
505 }
506
507 if (callback->timestamp() != CASS_INT64_MIN) {
508 pos = buf.encode_int64(pos, callback->timestamp());
509 }
510
511 if (with_keyspace) {
512 pos = buf.encode_string(pos, keyspace().data(), static_cast<uint16_t>(keyspace().size()));
513 }
514 }
515
516 return length;
517 }
518
calculate_routing_key(const Vector<size_t> & key_indices,String * routing_key) const519 bool Statement::calculate_routing_key(const Vector<size_t>& key_indices,
520 String* routing_key) const {
521 if (key_indices.empty()) return false;
522
523 if (key_indices.size() == 1) {
524 assert(key_indices.front() < elements().size());
525 const AbstractData::Element& element(elements()[key_indices.front()]);
526 if (element.is_unset() || element.is_null()) {
527 return false;
528 }
529 Buffer buf(element.get_buffer());
530 routing_key->assign(buf.data() + sizeof(int32_t), buf.size() - sizeof(int32_t));
531 } else {
532 size_t length = 0;
533
534 for (Vector<size_t>::const_iterator i = key_indices.begin(); i != key_indices.end(); ++i) {
535 assert(*i < elements().size());
536 const AbstractData::Element& element(elements()[*i]);
537 if (element.is_unset() || element.is_null()) {
538 return false;
539 }
540 size_t size = element.get_size() - sizeof(int32_t);
541 length += sizeof(uint16_t) + size + 1;
542 }
543
544 routing_key->clear();
545 routing_key->reserve(length);
546
547 for (Vector<size_t>::const_iterator i = key_indices.begin(); i != key_indices.end(); ++i) {
548 const AbstractData::Element& element(elements()[*i]);
549 Buffer buf(element.get_buffer());
550 size_t size = buf.size() - sizeof(int32_t);
551
552 char size_buf[sizeof(uint16_t)];
553 encode_uint16(size_buf, static_cast<uint16_t>(size));
554 routing_key->append(size_buf, sizeof(uint16_t));
555 routing_key->append(buf.data() + sizeof(int32_t), size);
556 routing_key->push_back(0);
557 }
558 }
559
560 return true;
561 }
562