1
2 // Cassandra includes:
3 #include <inttypes.h>
4 #include <netinet/in.h>
5 #include <sys/time.h>
6 #include <stdio.h>
7 #include <stdarg.h>
8
9 #include "thrift/Thrift.h"
10 #include "thrift/transport/TSocket.h"
11 #include "thrift/transport/TTransport.h"
12 #include "thrift/transport/TBufferTransports.h"
13 #include "thrift/protocol/TProtocol.h"
14 #include "thrift/protocol/TBinaryProtocol.h"
15 #include "gen-cpp/Cassandra.h"
16 // cassandra includes end
17
18 #include "cassandra_se.h"
19
20 struct st_mysql_lex_string
21 {
22 char *str;
23 size_t length;
24 };
25
26 using namespace std;
27 using namespace apache::thrift;
28 using namespace apache::thrift::transport;
29 using namespace apache::thrift::protocol;
30 using namespace org::apache::cassandra;
31
32
33 /*
34 Implementation of connection to one Cassandra column family (ie., table)
35 */
36 class Cassandra_se_impl: public Cassandra_se_interface
37 {
38 CassandraClient *cass; /* Connection to cassandra */
39
40 std::string column_family;
41 std::string keyspace;
42
43 ConsistencyLevel::type write_consistency;
44 ConsistencyLevel::type read_consistency;
45
46 /* Connection data */
47 std::string host;
48 int port;
49 /* How many times to retry an operation before giving up */
50 int thrift_call_retries_to_do;
51
52 bool inside_try_operation;
53
54 /* DDL data */
55 KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
56 CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/
57 std::vector<ColumnDef>::iterator column_ddl_it;
58
59 /* The list that was returned by the last key lookup */
60 std::vector<ColumnOrSuperColumn> column_data_vec;
61 std::vector<ColumnOrSuperColumn>::iterator column_data_it;
62
63 /* Insert preparation */
64 typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation;
65 typedef std::map<std::string, ColumnFamilyToMutation> KeyToCfMutationMap;
66
67 KeyToCfMutationMap batch_mutation; /* Prepare operation here */
68 int64_t insert_timestamp;
69 std::vector<Mutation>* insert_list;
70
71 /* Resultset we're reading */
72 std::vector<KeySlice> key_slice_vec;
73 std::vector<KeySlice>::iterator key_slice_it;
74
75 std::string rowkey; /* key of the record we're returning now */
76
77 SlicePredicate slice_pred;
78 SliceRange slice_pred_sr;
79 bool get_slices_returned_less;
80 bool get_slice_found_rows;
81
82 bool reconnect();
83 public:
Cassandra_se_impl()84 Cassandra_se_impl() : cass(NULL),
85 write_consistency(ConsistencyLevel::ONE),
86 read_consistency(ConsistencyLevel::ONE),
87 thrift_call_retries_to_do(1),
88 inside_try_operation(false)
89 {}
~Cassandra_se_impl()90 virtual ~Cassandra_se_impl(){ delete cass; }
91
92 /* Connection and DDL checks */
93 bool connect(const char *host_arg, int port_arg, const char *keyspace);
set_column_family(const char * cfname)94 void set_column_family(const char *cfname) { column_family.assign(cfname); }
95
96 bool setup_ddl_checks();
97 void first_ddl_column();
98 bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
99 void get_rowkey_type(char **name, char **type);
100 size_t get_ddl_size();
101 const char* get_default_validator();
102
103 /* Settings */
104 void set_consistency_levels(unsigned long read_cons_level, unsigned long write_cons_level);
set_n_retries(uint retries_arg)105 virtual void set_n_retries(uint retries_arg) {
106 thrift_call_retries_to_do= retries_arg;
107 }
108
109 /* Writes */
110 void clear_insert_buffer();
111 void start_row_insert(const char *key, int key_len);
112 void add_insert_column(const char *name, int name_len,
113 const char *value, int value_len);
114 void add_insert_delete_column(const char *name, int name_len);
115 void add_row_deletion(const char *key, int key_len,
116 Column_name_enumerator *col_names,
117 LEX_STRING *names, uint nnames);
118
119 bool do_insert();
120
121 /* Reads, point lookups */
122 bool get_slice(char *key, size_t key_len, bool *found);
123 bool get_next_read_column(char **name, int *name_len,
124 char **value, int *value_len );
125 void get_read_rowkey(char **value, int *value_len);
126
127 /* Reads, multi-row scans */
128 private:
129 bool have_rowkey_to_skip;
130 std::string rowkey_to_skip;
131
132 bool get_range_slices_param_last_key_as_start_key;
133 public:
134 bool get_range_slices(bool last_key_as_start_key);
135 void finish_reading_range_slices();
136 bool get_next_range_slice_row(bool *eof);
137
138 /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
139 void clear_read_columns();
140 void clear_read_all_columns();
141 void add_read_column(const char *name);
142
143 /* Reads, MRR scans */
144 void new_lookup_keys();
145 int add_lookup_key(const char *key, size_t key_len);
146 bool multiget_slice();
147
148 bool get_next_multiget_row();
149
150 bool truncate();
151
152 bool remove_row();
153
154 private:
155 bool retryable_truncate();
156 bool retryable_do_insert();
157 bool retryable_remove_row();
158 bool retryable_setup_ddl_checks();
159 bool retryable_multiget_slice();
160 bool retryable_get_range_slices();
161 bool retryable_get_slice();
162
163 std::vector<std::string> mrr_keys; /* can we use allocator to put these into MRR buffer? */
164 std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
165 std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
166
167 /* Non-inherited utility functions: */
168 int64_t get_i64_timestamp();
169
170 typedef bool (Cassandra_se_impl::*retryable_func_t)();
171 bool try_operation(retryable_func_t func);
172 };
173
174
175 /////////////////////////////////////////////////////////////////////////////
176 // Connection and setup
177 /////////////////////////////////////////////////////////////////////////////
create_cassandra_se()178 Cassandra_se_interface *create_cassandra_se()
179 {
180 return new Cassandra_se_impl;
181 }
182
183
connect(const char * host_arg,int port_arg,const char * keyspace_arg)184 bool Cassandra_se_impl::connect(const char *host_arg, int port_arg, const char *keyspace_arg)
185 {
186 keyspace.assign(keyspace_arg);
187 host.assign(host_arg);
188 port= port_arg;
189 return reconnect();
190 }
191
192
reconnect()193 bool Cassandra_se_impl::reconnect()
194 {
195
196 delete cass;
197 cass= NULL;
198
199 bool res= true;
200 try {
201 boost::shared_ptr<TTransport> socket =
202 boost::shared_ptr<TSocket>(new TSocket(host.c_str(), port));
203 boost::shared_ptr<TTransport> tr =
204 boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
205 boost::shared_ptr<TProtocol> p =
206 boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
207
208 cass= new CassandraClient(p);
209 tr->open();
210 cass->set_keyspace(keyspace.c_str());
211
212 res= false; // success
213 }catch(TTransportException te){
214 print_error("%s [%d]", te.what(), te.getType());
215 }catch(InvalidRequestException ire){
216 print_error("%s [%s]", ire.what(), ire.why.c_str());
217 }catch(NotFoundException nfe){
218 print_error("%s", nfe.what());
219 }catch(TException e){
220 print_error("Thrift exception: %s", e.what());
221 }catch (...) {
222 print_error("Unknown exception");
223 }
224
225 if (!res && setup_ddl_checks())
226 res= true;
227 return res;
228 }
229
230
set_consistency_levels(unsigned long read_cons_level,unsigned long write_cons_level)231 void Cassandra_se_impl::set_consistency_levels(unsigned long read_cons_level,
232 unsigned long write_cons_level)
233 {
234 write_consistency= (ConsistencyLevel::type)(write_cons_level + 1);
235 read_consistency= (ConsistencyLevel::type)(read_cons_level + 1);
236 }
237
238
retryable_setup_ddl_checks()239 bool Cassandra_se_impl::retryable_setup_ddl_checks()
240 {
241 try {
242
243 cass->describe_keyspace(ks_def, keyspace);
244
245 } catch (NotFoundException nfe) {
246 print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what());
247 return true;
248 }
249
250 std::vector<CfDef>::iterator it;
251 for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
252 {
253 cf_def= *it;
254 if (!cf_def.name.compare(column_family))
255 return false;
256 }
257
258 print_error("Column family %s not found in keyspace %s",
259 column_family.c_str(),
260 keyspace.c_str());
261 return true;
262 }
263
setup_ddl_checks()264 bool Cassandra_se_impl::setup_ddl_checks()
265 {
266 return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks);
267 }
268
269
first_ddl_column()270 void Cassandra_se_impl::first_ddl_column()
271 {
272 column_ddl_it= cf_def.column_metadata.begin();
273 }
274
275
next_ddl_column(char ** name,int * name_len,char ** type,int * type_len)276 bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len,
277 char **type, int *type_len)
278 {
279 if (column_ddl_it == cf_def.column_metadata.end())
280 return true;
281
282 *name= (char*)(*column_ddl_it).name.c_str();
283 *name_len= (*column_ddl_it).name.length();
284
285 *type= (char*)(*column_ddl_it).validation_class.c_str();
286 *type_len= (*column_ddl_it).validation_class.length();
287
288 column_ddl_it++;
289 return false;
290 }
291
292
get_rowkey_type(char ** name,char ** type)293 void Cassandra_se_impl::get_rowkey_type(char **name, char **type)
294 {
295 if (cf_def.__isset.key_validation_class)
296 *type= (char*)cf_def.key_validation_class.c_str();
297 else
298 *type= NULL;
299
300 if (cf_def.__isset.key_alias)
301 *name= (char*)cf_def.key_alias.c_str();
302 else
303 *name= NULL;
304 }
305
get_ddl_size()306 size_t Cassandra_se_impl::get_ddl_size()
307 {
308 return cf_def.column_metadata.size();
309 }
310
get_default_validator()311 const char* Cassandra_se_impl::get_default_validator()
312 {
313 return cf_def.default_validation_class.c_str();
314 }
315
316
317 /////////////////////////////////////////////////////////////////////////////
318 // Data writes
319 /////////////////////////////////////////////////////////////////////////////
get_i64_timestamp()320 int64_t Cassandra_se_impl::get_i64_timestamp()
321 {
322 struct timeval td;
323 gettimeofday(&td, NULL);
324 int64_t ms = td.tv_sec;
325 ms = ms * 1000;
326 int64_t usec = td.tv_usec;
327 usec = usec / 1000;
328 ms += usec;
329
330 return ms;
331 }
332
333
clear_insert_buffer()334 void Cassandra_se_impl::clear_insert_buffer()
335 {
336 batch_mutation.clear();
337 }
338
339
start_row_insert(const char * key,int key_len)340 void Cassandra_se_impl::start_row_insert(const char *key, int key_len)
341 {
342 std::string key_to_insert;
343 key_to_insert.assign(key, key_len);
344 batch_mutation[key_to_insert]= ColumnFamilyToMutation();
345 ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
346
347 cf_mut[column_family]= std::vector<Mutation>();
348 insert_list= &cf_mut[column_family];
349
350 insert_timestamp= get_i64_timestamp();
351 }
352
353
add_row_deletion(const char * key,int key_len,Column_name_enumerator * col_names,LEX_STRING * names,uint nnames)354 void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
355 Column_name_enumerator *col_names,
356 LEX_STRING *names, uint nnames)
357 {
358 std::string key_to_delete;
359 key_to_delete.assign(key, key_len);
360
361 batch_mutation[key_to_delete]= ColumnFamilyToMutation();
362 ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_delete];
363
364 cf_mut[column_family]= std::vector<Mutation>();
365 std::vector<Mutation> &mutation_list= cf_mut[column_family];
366
367 Mutation mut;
368 mut.__isset.deletion= true;
369 mut.deletion.__isset.timestamp= true;
370 mut.deletion.timestamp= get_i64_timestamp();
371 mut.deletion.__isset.predicate= true;
372
373 /*
374 Attempting to delete columns with SliceRange causes exception with message
375 "Deletion does not yet support SliceRange predicates".
376
377 Delete all columns individually.
378 */
379 SlicePredicate slice_pred;
380 slice_pred.__isset.column_names= true;
381 const char *col_name;
382 while ((col_name= col_names->get_next_name()))
383 slice_pred.column_names.push_back(std::string(col_name));
384 for (uint i= 0; i < nnames; i++)
385 slice_pred.column_names.push_back(std::string(names[i].str,
386 names[i].length));
387
388 mut.deletion.predicate= slice_pred;
389
390 mutation_list.push_back(mut);
391 }
392
393
add_insert_column(const char * name,int name_len,const char * value,int value_len)394 void Cassandra_se_impl::add_insert_column(const char *name,
395 int name_len,
396 const char *value,
397 int value_len)
398 {
399 Mutation mut;
400 mut.__isset.column_or_supercolumn= true;
401 mut.column_or_supercolumn.__isset.column= true;
402
403 Column& col=mut.column_or_supercolumn.column;
404 if (name_len)
405 col.name.assign(name, name_len);
406 else
407 col.name.assign(name);
408 col.value.assign(value, value_len);
409 col.timestamp= insert_timestamp;
410 col.__isset.value= true;
411 col.__isset.timestamp= true;
412 insert_list->push_back(mut);
413 }
414
add_insert_delete_column(const char * name,int name_len)415 void Cassandra_se_impl::add_insert_delete_column(const char *name,
416 int name_len)
417 {
418 Mutation mut;
419 mut.__isset.deletion= true;
420 mut.deletion.__isset.timestamp= true;
421 mut.deletion.timestamp= insert_timestamp;
422 mut.deletion.__isset.predicate= true;
423
424 SlicePredicate slice_pred;
425 slice_pred.__isset.column_names= true;
426 slice_pred.column_names.push_back(std::string(name, name_len));
427 mut.deletion.predicate= slice_pred;
428
429 insert_list->push_back(mut);
430 }
431
432
retryable_do_insert()433 bool Cassandra_se_impl::retryable_do_insert()
434 {
435 cass->batch_mutate(batch_mutation, write_consistency);
436
437 cassandra_counters.row_inserts+= batch_mutation.size();
438 cassandra_counters.row_insert_batches++;
439
440 clear_insert_buffer();
441 return 0;
442 }
443
444
do_insert()445 bool Cassandra_se_impl::do_insert()
446 {
447 /*
448 zero-size mutations are allowed by Cassandra's batch_mutate but lets not
449 do them (we may attempt to do it if there is a bulk insert that stores
450 exactly @@cassandra_insert_batch_size*n elements.
451 */
452 if (batch_mutation.empty())
453 return false;
454
455 return try_operation(&Cassandra_se_impl::retryable_do_insert);
456 }
457
458
459 /////////////////////////////////////////////////////////////////////////////
460 // Reading data
461 /////////////////////////////////////////////////////////////////////////////
462
463 /*
464 Make one key lookup. If the record is found, the result is stored locally and
465 the caller should iterate over it.
466 */
467
get_slice(char * key,size_t key_len,bool * found)468 bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
469 {
470 bool res;
471 rowkey.assign(key, key_len);
472
473 if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice)))
474 *found= get_slice_found_rows;
475 return res;
476 }
477
478
retryable_get_slice()479 bool Cassandra_se_impl::retryable_get_slice()
480 {
481 ColumnParent cparent;
482 cparent.column_family= column_family;
483
484 SlicePredicate slice_pred;
485 SliceRange sr;
486 sr.start = "";
487 sr.finish = "";
488 slice_pred.__set_slice_range(sr);
489
490 cass->get_slice(column_data_vec, rowkey, cparent, slice_pred,
491 read_consistency);
492
493 if (column_data_vec.size() == 0)
494 {
495 /*
496 No columns found. Cassandra doesn't allow records without any column =>
497 this means the seach key doesn't exist
498 */
499 get_slice_found_rows= false;
500 return false;
501 }
502 get_slice_found_rows= true;
503
504 column_data_it= column_data_vec.begin();
505 return false;
506 }
507
508
get_next_read_column(char ** name,int * name_len,char ** value,int * value_len)509 bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len,
510 char **value, int *value_len)
511 {
512 bool use_counter=false;
513 while (1)
514 {
515 if (column_data_it == column_data_vec.end())
516 return true;
517
518 if ((*column_data_it).__isset.column)
519 break; /* Ok it's a real column. Should be always the case. */
520
521 if ((*column_data_it).__isset.counter_column)
522 {
523 use_counter= true;
524 break;
525 }
526
527 column_data_it++;
528 }
529
530 ColumnOrSuperColumn& cs= *column_data_it;
531 if (use_counter)
532 {
533 *name_len= cs.counter_column.name.size();
534 *name= (char*)cs.counter_column.name.c_str();
535 *value= (char*)&cs.counter_column.value;
536 *value_len= sizeof(cs.counter_column.value);
537 }
538 else
539 {
540 *name_len= cs.column.name.size();
541 *name= (char*)cs.column.name.c_str();
542 *value= (char*)cs.column.value.c_str();
543 *value_len= cs.column.value.length();
544 }
545
546 column_data_it++;
547 return false;
548 }
549
550
551 /* Return the rowkey for the record that was read */
552
get_read_rowkey(char ** value,int * value_len)553 void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len)
554 {
555 *value= (char*)rowkey.c_str();
556 *value_len= rowkey.length();
557 }
558
559
get_range_slices(bool last_key_as_start_key)560 bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
561 {
562 get_range_slices_param_last_key_as_start_key= last_key_as_start_key;
563
564 return try_operation(&Cassandra_se_impl::retryable_get_range_slices);
565 }
566
567
retryable_get_range_slices()568 bool Cassandra_se_impl::retryable_get_range_slices()
569 {
570 bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key;
571
572 ColumnParent cparent;
573 cparent.column_family= column_family;
574
575 /* SlicePredicate can be used to limit columns we will retrieve */
576
577 KeyRange key_range;
578 key_range.__isset.start_key= true;
579 key_range.__isset.end_key= true;
580
581 if (last_key_as_start_key)
582 {
583 key_range.start_key= rowkey;
584
585 have_rowkey_to_skip= true;
586 rowkey_to_skip= rowkey;
587 }
588 else
589 {
590 have_rowkey_to_skip= false;
591 key_range.start_key.assign("", 0);
592 }
593
594 key_range.end_key.assign("", 0);
595 key_range.count= read_batch_size;
596
597 cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range,
598 read_consistency);
599
600 if (key_slice_vec.size() < (uint)read_batch_size)
601 get_slices_returned_less= true;
602 else
603 get_slices_returned_less= false;
604
605 key_slice_it= key_slice_vec.begin();
606 return false;
607 }
608
609
610 /* Switch to next row. This may produce an error */
get_next_range_slice_row(bool * eof)611 bool Cassandra_se_impl::get_next_range_slice_row(bool *eof)
612 {
613 restart:
614 if (key_slice_it == key_slice_vec.end())
615 {
616 if (get_slices_returned_less)
617 {
618 *eof= true;
619 return false;
620 }
621
622 /*
623 We have read through all columns in this batch. Try getting the next
624 batch.
625 */
626 if (get_range_slices(true))
627 return true;
628
629 if (key_slice_vec.empty())
630 {
631 *eof= true;
632 return false;
633 }
634 }
635
636 /*
637 (1) - skip the last row that we have read in the previous batch.
638 (2) - Rows that were deleted show up as rows without any columns. Skip
639 them, like CQL does.
640 */
641 if ((have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) || // (1)
642 key_slice_it->columns.size() == 0) // (2)
643 {
644 key_slice_it++;
645 goto restart;
646 }
647
648 *eof= false;
649 column_data_vec= key_slice_it->columns;
650 rowkey= key_slice_it->key;
651 column_data_it= column_data_vec.begin();
652 key_slice_it++;
653 return false;
654 }
655
656
finish_reading_range_slices()657 void Cassandra_se_impl::finish_reading_range_slices()
658 {
659 key_slice_vec.clear();
660 }
661
662
clear_read_columns()663 void Cassandra_se_impl::clear_read_columns()
664 {
665 slice_pred.column_names.clear();
666 }
667
clear_read_all_columns()668 void Cassandra_se_impl::clear_read_all_columns()
669 {
670 slice_pred_sr.start = "";
671 slice_pred_sr.finish = "";
672 slice_pred.__set_slice_range(slice_pred_sr);
673 }
674
675
add_read_column(const char * name_arg)676 void Cassandra_se_impl::add_read_column(const char *name_arg)
677 {
678 std::string name(name_arg);
679 slice_pred.__isset.column_names= true;
680 slice_pred.column_names.push_back(name);
681 }
682
683
truncate()684 bool Cassandra_se_impl::truncate()
685 {
686 return try_operation(&Cassandra_se_impl::retryable_truncate);
687 }
688
689
retryable_truncate()690 bool Cassandra_se_impl::retryable_truncate()
691 {
692 cass->truncate(column_family);
693 return 0;
694 }
695
696
remove_row()697 bool Cassandra_se_impl::remove_row()
698 {
699 return try_operation(&Cassandra_se_impl::retryable_remove_row);
700 }
701
702
retryable_remove_row()703 bool Cassandra_se_impl::retryable_remove_row()
704 {
705 ColumnPath column_path;
706 column_path.column_family= column_family;
707 cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
708 return 0;
709 }
710
711 /*
712 Try calling a function, catching possible Cassandra errors, and re-trying
713 for "transient" errors.
714 */
try_operation(retryable_func_t func_to_call)715 bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
716 {
717 bool res;
718 int n_attempts= thrift_call_retries_to_do;
719
720 bool was_inside_try_operation= inside_try_operation;
721 inside_try_operation= true;
722
723 do
724 {
725 res= true;
726
727 try {
728
729 if ((res= (this->*func_to_call)()))
730 {
731 /*
732 The function call was made successfully (without timeouts, etc),
733 but something inside it returned 'true'.
734 This is supposedly a failure (or "not found" or other negative
735 result). We need to return this to the caller.
736 */
737 n_attempts= 0;
738 }
739
740 } catch (InvalidRequestException ire) {
741 n_attempts= 0; /* there is no point in retrying this operation */
742 print_error("%s [%s]", ire.what(), ire.why.c_str());
743 } catch (UnavailableException ue) {
744 cassandra_counters.unavailable_exceptions++;
745 if (!--n_attempts)
746 print_error("UnavailableException: %s", ue.what());
747 } catch (TimedOutException te) {
748 /*
749 Note: this is a timeout generated *inside Cassandra cluster*.
750 Connection between us and the cluster is ok, but something went wrong
751 within the cluster.
752 */
753 cassandra_counters.timeout_exceptions++;
754 if (!--n_attempts)
755 print_error("TimedOutException: %s", te.what());
756 } catch (TTransportException tte) {
757 /* Something went wrong in communication between us and Cassandra */
758 cassandra_counters.network_exceptions++;
759
760 switch (tte.getType())
761 {
762 case TTransportException::NOT_OPEN:
763 case TTransportException::TIMED_OUT:
764 case TTransportException::END_OF_FILE:
765 case TTransportException::INTERRUPTED:
766 {
767 if (!was_inside_try_operation && reconnect())
768 {
769 /* Failed to reconnect, no point to retry the operation */
770 n_attempts= 0;
771 print_error("%s", tte.what());
772 }
773 else
774 {
775 n_attempts--;
776 }
777 break;
778 }
779 default:
780 {
781 /*
782 We assume it doesn't make sense to retry for
783 unknown kinds of TTransportException-s
784 */
785 n_attempts= 0;
786 print_error("%s", tte.what());
787 }
788 }
789 }catch(TException e){
790 /* todo: we may use retry for certain kinds of Thrift errors */
791 n_attempts= 0;
792 print_error("Thrift exception: %s", e.what());
793 } catch (...) {
794 n_attempts= 0; /* Don't retry */
795 print_error("Unknown exception");
796 }
797
798 } while (res && n_attempts > 0);
799
800 inside_try_operation= was_inside_try_operation;
801 return res;
802 }
803
804 /////////////////////////////////////////////////////////////////////////////
805 // MRR reads
806 /////////////////////////////////////////////////////////////////////////////
807
new_lookup_keys()808 void Cassandra_se_impl::new_lookup_keys()
809 {
810 mrr_keys.clear();
811 }
812
813
add_lookup_key(const char * key,size_t key_len)814 int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len)
815 {
816 mrr_keys.push_back(std::string(key, key_len));
817 return mrr_keys.size();
818 }
819
multiget_slice()820 bool Cassandra_se_impl::multiget_slice()
821 {
822 return try_operation(&Cassandra_se_impl::retryable_multiget_slice);
823 }
824
825
retryable_multiget_slice()826 bool Cassandra_se_impl::retryable_multiget_slice()
827 {
828 ColumnParent cparent;
829 cparent.column_family= column_family;
830
831 SlicePredicate slice_pred;
832 SliceRange sr;
833 sr.start = "";
834 sr.finish = "";
835 slice_pred.__set_slice_range(sr);
836
837 cassandra_counters.multiget_reads++;
838 cassandra_counters.multiget_keys_scanned += mrr_keys.size();
839 cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
840 read_consistency);
841
842 cassandra_counters.multiget_rows_read += mrr_result.size();
843 mrr_result_it= mrr_result.begin();
844
845 return false;
846 }
847
848
get_next_multiget_row()849 bool Cassandra_se_impl::get_next_multiget_row()
850 {
851 if (mrr_result_it == mrr_result.end())
852 return true; /* EOF */
853
854 column_data_vec= mrr_result_it->second;
855 rowkey= mrr_result_it->first;
856
857 column_data_it= column_data_vec.begin();
858 mrr_result_it++;
859 return false;
860 }
861
862
863
864