1 
2 // Cassandra includes:
3 #include <inttypes.h>
4 #include <netinet/in.h>
5 #include <sys/time.h>
6 #include <stdio.h>
7 #include <stdarg.h>
8 
9 #include "thrift/Thrift.h"
10 #include "thrift/transport/TSocket.h"
11 #include "thrift/transport/TTransport.h"
12 #include "thrift/transport/TBufferTransports.h"
13 #include "thrift/protocol/TProtocol.h"
14 #include "thrift/protocol/TBinaryProtocol.h"
15 #include "gen-cpp/Cassandra.h"
16 // cassandra includes end
17 
18 #include "cassandra_se.h"
19 
20 struct st_mysql_lex_string
21 {
22   char *str;
23   size_t length;
24 };
25 
26 using namespace std;
27 using namespace apache::thrift;
28 using namespace apache::thrift::transport;
29 using namespace apache::thrift::protocol;
30 using namespace org::apache::cassandra;
31 
32 
33 /*
34   Implementation of connection to one Cassandra column family (ie., table)
35 */
36 class Cassandra_se_impl: public Cassandra_se_interface
37 {
38   CassandraClient *cass; /* Connection to cassandra */
39 
40   std::string column_family;
41   std::string keyspace;
42 
43   ConsistencyLevel::type write_consistency;
44   ConsistencyLevel::type read_consistency;
45 
46   /* Connection data */
47   std::string host;
48   int port;
49   /* How many times to retry an operation before giving up */
50   int thrift_call_retries_to_do;
51 
52   bool inside_try_operation;
53 
54   /* DDL data */
55   KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
56   CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/
57   std::vector<ColumnDef>::iterator column_ddl_it;
58 
59   /* The list that was returned by the last key lookup */
60   std::vector<ColumnOrSuperColumn> column_data_vec;
61   std::vector<ColumnOrSuperColumn>::iterator column_data_it;
62 
63   /* Insert preparation */
64   typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation;
65   typedef std::map<std::string,  ColumnFamilyToMutation> KeyToCfMutationMap;
66 
67   KeyToCfMutationMap batch_mutation; /* Prepare operation here */
68   int64_t insert_timestamp;
69   std::vector<Mutation>* insert_list;
70 
71   /* Resultset we're reading */
72   std::vector<KeySlice> key_slice_vec;
73   std::vector<KeySlice>::iterator key_slice_it;
74 
75   std::string rowkey; /* key of the record we're returning now */
76 
77   SlicePredicate slice_pred;
78   SliceRange slice_pred_sr;
79   bool get_slices_returned_less;
80   bool get_slice_found_rows;
81 
82   bool reconnect();
83 public:
Cassandra_se_impl()84   Cassandra_se_impl() : cass(NULL),
85                         write_consistency(ConsistencyLevel::ONE),
86                         read_consistency(ConsistencyLevel::ONE),
87                         thrift_call_retries_to_do(1),
88                         inside_try_operation(false)
89                         {}
~Cassandra_se_impl()90   virtual ~Cassandra_se_impl(){ delete cass; }
91 
92   /* Connection and DDL checks */
93   bool connect(const char *host_arg, int port_arg, const char *keyspace);
set_column_family(const char * cfname)94   void set_column_family(const char *cfname) { column_family.assign(cfname); }
95 
96   bool setup_ddl_checks();
97   void first_ddl_column();
98   bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
99   void get_rowkey_type(char **name, char **type);
100   size_t get_ddl_size();
101   const char* get_default_validator();
102 
103   /* Settings */
104   void set_consistency_levels(unsigned long read_cons_level, unsigned long write_cons_level);
set_n_retries(uint retries_arg)105   virtual void set_n_retries(uint retries_arg) {
106     thrift_call_retries_to_do= retries_arg;
107   }
108 
109   /* Writes */
110   void clear_insert_buffer();
111   void start_row_insert(const char *key, int key_len);
112   void add_insert_column(const char *name, int name_len,
113                          const char *value, int value_len);
114   void add_insert_delete_column(const char *name, int name_len);
115   void add_row_deletion(const char *key, int key_len,
116                         Column_name_enumerator *col_names,
117                         LEX_STRING *names, uint nnames);
118 
119   bool do_insert();
120 
121   /* Reads, point lookups */
122   bool get_slice(char *key, size_t key_len, bool *found);
123   bool get_next_read_column(char **name, int *name_len,
124                             char **value, int *value_len );
125   void get_read_rowkey(char **value, int *value_len);
126 
127   /* Reads, multi-row scans */
128 private:
129   bool have_rowkey_to_skip;
130   std::string rowkey_to_skip;
131 
132   bool get_range_slices_param_last_key_as_start_key;
133 public:
134   bool get_range_slices(bool last_key_as_start_key);
135   void finish_reading_range_slices();
136   bool get_next_range_slice_row(bool *eof);
137 
138   /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
139   void clear_read_columns();
140   void clear_read_all_columns();
141   void add_read_column(const char *name);
142 
143   /* Reads, MRR scans */
144   void new_lookup_keys();
145   int  add_lookup_key(const char *key, size_t key_len);
146   bool multiget_slice();
147 
148   bool get_next_multiget_row();
149 
150   bool truncate();
151 
152   bool remove_row();
153 
154 private:
155   bool retryable_truncate();
156   bool retryable_do_insert();
157   bool retryable_remove_row();
158   bool retryable_setup_ddl_checks();
159   bool retryable_multiget_slice();
160   bool retryable_get_range_slices();
161   bool retryable_get_slice();
162 
163   std::vector<std::string> mrr_keys; /* can we use allocator to put these into MRR buffer? */
164   std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
165   std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
166 
167   /* Non-inherited utility functions: */
168   int64_t get_i64_timestamp();
169 
170   typedef bool (Cassandra_se_impl::*retryable_func_t)();
171   bool try_operation(retryable_func_t func);
172 };
173 
174 
175 /////////////////////////////////////////////////////////////////////////////
176 // Connection and setup
177 /////////////////////////////////////////////////////////////////////////////
create_cassandra_se()178 Cassandra_se_interface *create_cassandra_se()
179 {
180   return new Cassandra_se_impl;
181 }
182 
183 
connect(const char * host_arg,int port_arg,const char * keyspace_arg)184 bool Cassandra_se_impl::connect(const char *host_arg, int port_arg, const char *keyspace_arg)
185 {
186   keyspace.assign(keyspace_arg);
187   host.assign(host_arg);
188   port= port_arg;
189   return reconnect();
190 }
191 
192 
reconnect()193 bool Cassandra_se_impl::reconnect()
194 {
195 
196   delete cass;
197   cass= NULL;
198 
199   bool res= true;
200   try {
201     boost::shared_ptr<TTransport> socket =
202       boost::shared_ptr<TSocket>(new TSocket(host.c_str(), port));
203     boost::shared_ptr<TTransport> tr =
204       boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
205     boost::shared_ptr<TProtocol> p =
206       boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
207 
208     cass= new CassandraClient(p);
209     tr->open();
210     cass->set_keyspace(keyspace.c_str());
211 
212     res= false; // success
213   }catch(TTransportException te){
214     print_error("%s [%d]", te.what(), te.getType());
215   }catch(InvalidRequestException ire){
216     print_error("%s [%s]", ire.what(), ire.why.c_str());
217   }catch(NotFoundException nfe){
218     print_error("%s", nfe.what());
219   }catch(TException e){
220     print_error("Thrift exception: %s", e.what());
221   }catch (...) {
222     print_error("Unknown exception");
223   }
224 
225   if (!res && setup_ddl_checks())
226     res= true;
227   return res;
228 }
229 
230 
set_consistency_levels(unsigned long read_cons_level,unsigned long write_cons_level)231 void Cassandra_se_impl::set_consistency_levels(unsigned long read_cons_level,
232                                                unsigned long write_cons_level)
233 {
234   write_consistency= (ConsistencyLevel::type)(write_cons_level + 1);
235   read_consistency=  (ConsistencyLevel::type)(read_cons_level + 1);
236 }
237 
238 
retryable_setup_ddl_checks()239 bool Cassandra_se_impl::retryable_setup_ddl_checks()
240 {
241   try {
242 
243     cass->describe_keyspace(ks_def, keyspace);
244 
245   } catch (NotFoundException nfe) {
246     print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what());
247     return true;
248   }
249 
250   std::vector<CfDef>::iterator it;
251   for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
252   {
253     cf_def= *it;
254     if (!cf_def.name.compare(column_family))
255       return false;
256   }
257 
258   print_error("Column family %s not found in keyspace %s",
259                column_family.c_str(),
260                keyspace.c_str());
261   return true;
262 }
263 
setup_ddl_checks()264 bool Cassandra_se_impl::setup_ddl_checks()
265 {
266   return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks);
267 }
268 
269 
first_ddl_column()270 void Cassandra_se_impl::first_ddl_column()
271 {
272   column_ddl_it= cf_def.column_metadata.begin();
273 }
274 
275 
next_ddl_column(char ** name,int * name_len,char ** type,int * type_len)276 bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len,
277                                         char **type, int *type_len)
278 {
279   if (column_ddl_it == cf_def.column_metadata.end())
280     return true;
281 
282   *name= (char*)(*column_ddl_it).name.c_str();
283   *name_len= (*column_ddl_it).name.length();
284 
285   *type= (char*)(*column_ddl_it).validation_class.c_str();
286   *type_len= (*column_ddl_it).validation_class.length();
287 
288   column_ddl_it++;
289   return false;
290 }
291 
292 
get_rowkey_type(char ** name,char ** type)293 void Cassandra_se_impl::get_rowkey_type(char **name, char **type)
294 {
295   if (cf_def.__isset.key_validation_class)
296     *type= (char*)cf_def.key_validation_class.c_str();
297   else
298     *type= NULL;
299 
300   if (cf_def.__isset.key_alias)
301     *name= (char*)cf_def.key_alias.c_str();
302   else
303     *name= NULL;
304 }
305 
get_ddl_size()306 size_t Cassandra_se_impl::get_ddl_size()
307 {
308   return cf_def.column_metadata.size();
309 }
310 
get_default_validator()311 const char* Cassandra_se_impl::get_default_validator()
312 {
313   return cf_def.default_validation_class.c_str();
314 }
315 
316 
317 /////////////////////////////////////////////////////////////////////////////
318 // Data writes
319 /////////////////////////////////////////////////////////////////////////////
get_i64_timestamp()320 int64_t Cassandra_se_impl::get_i64_timestamp()
321 {
322   struct timeval td;
323   gettimeofday(&td, NULL);
324   int64_t ms = td.tv_sec;
325   ms = ms * 1000;
326   int64_t usec = td.tv_usec;
327   usec = usec / 1000;
328   ms += usec;
329 
330   return ms;
331 }
332 
333 
clear_insert_buffer()334 void Cassandra_se_impl::clear_insert_buffer()
335 {
336   batch_mutation.clear();
337 }
338 
339 
start_row_insert(const char * key,int key_len)340 void Cassandra_se_impl::start_row_insert(const char *key, int key_len)
341 {
342   std::string key_to_insert;
343   key_to_insert.assign(key, key_len);
344   batch_mutation[key_to_insert]= ColumnFamilyToMutation();
345   ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
346 
347   cf_mut[column_family]= std::vector<Mutation>();
348   insert_list= &cf_mut[column_family];
349 
350   insert_timestamp= get_i64_timestamp();
351 }
352 
353 
add_row_deletion(const char * key,int key_len,Column_name_enumerator * col_names,LEX_STRING * names,uint nnames)354 void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
355                                          Column_name_enumerator *col_names,
356                                          LEX_STRING *names, uint nnames)
357 {
358   std::string key_to_delete;
359   key_to_delete.assign(key, key_len);
360 
361   batch_mutation[key_to_delete]= ColumnFamilyToMutation();
362   ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_delete];
363 
364   cf_mut[column_family]= std::vector<Mutation>();
365   std::vector<Mutation> &mutation_list= cf_mut[column_family];
366 
367   Mutation mut;
368   mut.__isset.deletion= true;
369   mut.deletion.__isset.timestamp= true;
370   mut.deletion.timestamp= get_i64_timestamp();
371   mut.deletion.__isset.predicate= true;
372 
373   /*
374     Attempting to delete columns with SliceRange causes exception with message
375     "Deletion does not yet support SliceRange predicates".
376 
377     Delete all columns individually.
378   */
379   SlicePredicate slice_pred;
380   slice_pred.__isset.column_names= true;
381   const char *col_name;
382   while ((col_name= col_names->get_next_name()))
383     slice_pred.column_names.push_back(std::string(col_name));
384   for (uint i= 0; i < nnames; i++)
385     slice_pred.column_names.push_back(std::string(names[i].str,
386                                                   names[i].length));
387 
388   mut.deletion.predicate= slice_pred;
389 
390   mutation_list.push_back(mut);
391 }
392 
393 
add_insert_column(const char * name,int name_len,const char * value,int value_len)394 void Cassandra_se_impl::add_insert_column(const char *name,
395                                           int name_len,
396                                           const char *value,
397                                           int value_len)
398 {
399   Mutation mut;
400   mut.__isset.column_or_supercolumn= true;
401   mut.column_or_supercolumn.__isset.column= true;
402 
403   Column& col=mut.column_or_supercolumn.column;
404   if (name_len)
405     col.name.assign(name, name_len);
406   else
407     col.name.assign(name);
408   col.value.assign(value, value_len);
409   col.timestamp= insert_timestamp;
410   col.__isset.value= true;
411   col.__isset.timestamp= true;
412   insert_list->push_back(mut);
413 }
414 
add_insert_delete_column(const char * name,int name_len)415 void Cassandra_se_impl::add_insert_delete_column(const char *name,
416                                                  int name_len)
417 {
418   Mutation mut;
419   mut.__isset.deletion= true;
420   mut.deletion.__isset.timestamp= true;
421   mut.deletion.timestamp= insert_timestamp;
422   mut.deletion.__isset.predicate= true;
423 
424   SlicePredicate slice_pred;
425   slice_pred.__isset.column_names= true;
426   slice_pred.column_names.push_back(std::string(name, name_len));
427   mut.deletion.predicate= slice_pred;
428 
429   insert_list->push_back(mut);
430 }
431 
432 
retryable_do_insert()433 bool Cassandra_se_impl::retryable_do_insert()
434 {
435   cass->batch_mutate(batch_mutation, write_consistency);
436 
437   cassandra_counters.row_inserts+= batch_mutation.size();
438   cassandra_counters.row_insert_batches++;
439 
440   clear_insert_buffer();
441   return 0;
442 }
443 
444 
do_insert()445 bool Cassandra_se_impl::do_insert()
446 {
447   /*
448     zero-size mutations are allowed by Cassandra's batch_mutate but lets not
449     do them (we may attempt to do it if there is a bulk insert that stores
450     exactly @@cassandra_insert_batch_size*n elements.
451   */
452   if (batch_mutation.empty())
453     return false;
454 
455   return try_operation(&Cassandra_se_impl::retryable_do_insert);
456 }
457 
458 
459 /////////////////////////////////////////////////////////////////////////////
460 // Reading data
461 /////////////////////////////////////////////////////////////////////////////
462 
463 /*
464   Make one key lookup. If the record is found, the result is stored locally and
465   the caller should iterate over it.
466 */
467 
get_slice(char * key,size_t key_len,bool * found)468 bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
469 {
470   bool res;
471   rowkey.assign(key, key_len);
472 
473   if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice)))
474     *found= get_slice_found_rows;
475   return res;
476 }
477 
478 
retryable_get_slice()479 bool Cassandra_se_impl::retryable_get_slice()
480 {
481   ColumnParent cparent;
482   cparent.column_family= column_family;
483 
484   SlicePredicate slice_pred;
485   SliceRange sr;
486   sr.start = "";
487   sr.finish = "";
488   slice_pred.__set_slice_range(sr);
489 
490   cass->get_slice(column_data_vec, rowkey, cparent, slice_pred,
491                   read_consistency);
492 
493   if (column_data_vec.size() == 0)
494   {
495     /*
496       No columns found. Cassandra doesn't allow records without any column =>
497       this means the seach key doesn't exist
498     */
499     get_slice_found_rows= false;
500     return false;
501   }
502   get_slice_found_rows= true;
503 
504   column_data_it= column_data_vec.begin();
505   return false;
506 }
507 
508 
get_next_read_column(char ** name,int * name_len,char ** value,int * value_len)509 bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len,
510                                              char **value, int *value_len)
511 {
512   bool use_counter=false;
513   while (1)
514   {
515     if (column_data_it == column_data_vec.end())
516       return true;
517 
518     if ((*column_data_it).__isset.column)
519       break; /* Ok it's a real column. Should be always the case. */
520 
521     if ((*column_data_it).__isset.counter_column)
522     {
523       use_counter= true;
524       break;
525     }
526 
527     column_data_it++;
528   }
529 
530   ColumnOrSuperColumn& cs= *column_data_it;
531   if (use_counter)
532   {
533     *name_len= cs.counter_column.name.size();
534     *name= (char*)cs.counter_column.name.c_str();
535     *value= (char*)&cs.counter_column.value;
536     *value_len= sizeof(cs.counter_column.value);
537   }
538   else
539   {
540     *name_len= cs.column.name.size();
541     *name= (char*)cs.column.name.c_str();
542     *value= (char*)cs.column.value.c_str();
543     *value_len= cs.column.value.length();
544   }
545 
546   column_data_it++;
547   return false;
548 }
549 
550 
551 /* Return the rowkey for the record that was read */
552 
get_read_rowkey(char ** value,int * value_len)553 void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len)
554 {
555   *value= (char*)rowkey.c_str();
556   *value_len= rowkey.length();
557 }
558 
559 
get_range_slices(bool last_key_as_start_key)560 bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
561 {
562   get_range_slices_param_last_key_as_start_key= last_key_as_start_key;
563 
564   return try_operation(&Cassandra_se_impl::retryable_get_range_slices);
565 }
566 
567 
retryable_get_range_slices()568 bool Cassandra_se_impl::retryable_get_range_slices()
569 {
570   bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key;
571 
572   ColumnParent cparent;
573   cparent.column_family= column_family;
574 
575   /* SlicePredicate can be used to limit columns we will retrieve */
576 
577   KeyRange key_range;
578   key_range.__isset.start_key= true;
579   key_range.__isset.end_key= true;
580 
581   if (last_key_as_start_key)
582   {
583     key_range.start_key= rowkey;
584 
585     have_rowkey_to_skip= true;
586     rowkey_to_skip= rowkey;
587   }
588   else
589   {
590     have_rowkey_to_skip= false;
591     key_range.start_key.assign("", 0);
592   }
593 
594   key_range.end_key.assign("", 0);
595   key_range.count= read_batch_size;
596 
597   cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range,
598                          read_consistency);
599 
600   if (key_slice_vec.size() < (uint)read_batch_size)
601     get_slices_returned_less= true;
602   else
603     get_slices_returned_less= false;
604 
605   key_slice_it= key_slice_vec.begin();
606   return false;
607 }
608 
609 
610 /* Switch to next row. This may produce an error */
get_next_range_slice_row(bool * eof)611 bool Cassandra_se_impl::get_next_range_slice_row(bool *eof)
612 {
613 restart:
614   if (key_slice_it == key_slice_vec.end())
615   {
616     if (get_slices_returned_less)
617     {
618       *eof= true;
619       return false;
620     }
621 
622     /*
623       We have read through all columns in this batch. Try getting the next
624       batch.
625     */
626     if (get_range_slices(true))
627       return true;
628 
629     if (key_slice_vec.empty())
630     {
631       *eof= true;
632       return false;
633     }
634   }
635 
636   /*
637     (1) - skip the last row that we have read in the previous batch.
638     (2) - Rows that were deleted show up as rows without any columns. Skip
639           them, like CQL does.
640   */
641   if ((have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) || // (1)
642       key_slice_it->columns.size() == 0) // (2)
643   {
644     key_slice_it++;
645     goto restart;
646   }
647 
648   *eof= false;
649   column_data_vec= key_slice_it->columns;
650   rowkey= key_slice_it->key;
651   column_data_it= column_data_vec.begin();
652   key_slice_it++;
653   return false;
654 }
655 
656 
finish_reading_range_slices()657 void Cassandra_se_impl::finish_reading_range_slices()
658 {
659   key_slice_vec.clear();
660 }
661 
662 
clear_read_columns()663 void Cassandra_se_impl::clear_read_columns()
664 {
665   slice_pred.column_names.clear();
666 }
667 
clear_read_all_columns()668 void Cassandra_se_impl::clear_read_all_columns()
669 {
670   slice_pred_sr.start = "";
671   slice_pred_sr.finish = "";
672   slice_pred.__set_slice_range(slice_pred_sr);
673 }
674 
675 
add_read_column(const char * name_arg)676 void Cassandra_se_impl::add_read_column(const char *name_arg)
677 {
678   std::string name(name_arg);
679   slice_pred.__isset.column_names= true;
680   slice_pred.column_names.push_back(name);
681 }
682 
683 
truncate()684 bool Cassandra_se_impl::truncate()
685 {
686   return try_operation(&Cassandra_se_impl::retryable_truncate);
687 }
688 
689 
retryable_truncate()690 bool Cassandra_se_impl::retryable_truncate()
691 {
692   cass->truncate(column_family);
693   return 0;
694 }
695 
696 
remove_row()697 bool Cassandra_se_impl::remove_row()
698 {
699   return try_operation(&Cassandra_se_impl::retryable_remove_row);
700 }
701 
702 
retryable_remove_row()703 bool Cassandra_se_impl::retryable_remove_row()
704 {
705   ColumnPath column_path;
706   column_path.column_family= column_family;
707   cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
708   return 0;
709 }
710 
711 /*
712   Try calling a function, catching possible Cassandra errors, and re-trying
713    for "transient" errors.
714 */
try_operation(retryable_func_t func_to_call)715 bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
716 {
717   bool res;
718   int n_attempts= thrift_call_retries_to_do;
719 
720   bool was_inside_try_operation= inside_try_operation;
721   inside_try_operation= true;
722 
723   do
724   {
725     res= true;
726 
727     try {
728 
729       if ((res= (this->*func_to_call)()))
730       {
731         /*
732           The function call was made successfully (without timeouts, etc),
733           but something inside it returned 'true'.
734           This is supposedly a failure (or "not found" or other negative
735           result). We need to return this to the caller.
736         */
737         n_attempts= 0;
738       }
739 
740     } catch (InvalidRequestException ire) {
741       n_attempts= 0; /* there is no point in retrying this operation */
742       print_error("%s [%s]", ire.what(), ire.why.c_str());
743     } catch (UnavailableException ue) {
744       cassandra_counters.unavailable_exceptions++;
745       if (!--n_attempts)
746         print_error("UnavailableException: %s", ue.what());
747     } catch (TimedOutException te) {
748       /*
749         Note: this is a timeout generated *inside Cassandra cluster*.
750         Connection between us and the cluster is ok, but something went wrong
751         within the cluster.
752       */
753       cassandra_counters.timeout_exceptions++;
754       if (!--n_attempts)
755         print_error("TimedOutException: %s", te.what());
756     } catch (TTransportException tte) {
757       /* Something went wrong in communication between us and Cassandra */
758       cassandra_counters.network_exceptions++;
759 
760       switch (tte.getType())
761       {
762         case TTransportException::NOT_OPEN:
763         case TTransportException::TIMED_OUT:
764         case TTransportException::END_OF_FILE:
765         case TTransportException::INTERRUPTED:
766         {
767           if (!was_inside_try_operation && reconnect())
768           {
769             /* Failed to reconnect, no point to retry the operation */
770             n_attempts= 0;
771             print_error("%s", tte.what());
772           }
773           else
774           {
775             n_attempts--;
776           }
777           break;
778         }
779         default:
780         {
781           /*
782             We assume it doesn't make sense to retry for
783             unknown kinds of TTransportException-s
784           */
785           n_attempts= 0;
786           print_error("%s", tte.what());
787         }
788       }
789     }catch(TException e){
790       /* todo: we may use retry for certain kinds of Thrift errors */
791       n_attempts= 0;
792       print_error("Thrift exception: %s", e.what());
793     } catch (...) {
794       n_attempts= 0; /* Don't retry */
795       print_error("Unknown exception");
796     }
797 
798   } while (res && n_attempts > 0);
799 
800   inside_try_operation= was_inside_try_operation;
801   return res;
802 }
803 
804 /////////////////////////////////////////////////////////////////////////////
805 // MRR reads
806 /////////////////////////////////////////////////////////////////////////////
807 
new_lookup_keys()808 void Cassandra_se_impl::new_lookup_keys()
809 {
810   mrr_keys.clear();
811 }
812 
813 
add_lookup_key(const char * key,size_t key_len)814 int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len)
815 {
816   mrr_keys.push_back(std::string(key, key_len));
817   return mrr_keys.size();
818 }
819 
multiget_slice()820 bool Cassandra_se_impl::multiget_slice()
821 {
822   return try_operation(&Cassandra_se_impl::retryable_multiget_slice);
823 }
824 
825 
retryable_multiget_slice()826 bool Cassandra_se_impl::retryable_multiget_slice()
827 {
828   ColumnParent cparent;
829   cparent.column_family= column_family;
830 
831   SlicePredicate slice_pred;
832   SliceRange sr;
833   sr.start = "";
834   sr.finish = "";
835   slice_pred.__set_slice_range(sr);
836 
837   cassandra_counters.multiget_reads++;
838   cassandra_counters.multiget_keys_scanned += mrr_keys.size();
839   cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
840                        read_consistency);
841 
842   cassandra_counters.multiget_rows_read += mrr_result.size();
843   mrr_result_it= mrr_result.begin();
844 
845   return false;
846 }
847 
848 
get_next_multiget_row()849 bool Cassandra_se_impl::get_next_multiget_row()
850 {
851   if (mrr_result_it == mrr_result.end())
852     return true; /* EOF */
853 
854   column_data_vec= mrr_result_it->second;
855   rowkey= mrr_result_it->first;
856 
857   column_data_it= column_data_vec.begin();
858   mrr_result_it++;
859   return false;
860 }
861 
862 
863 
864