1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_TOKEN_MAP_IMPL_HPP
18 #define DATASTAX_INTERNAL_TOKEN_MAP_IMPL_HPP
19 
20 #include "collection_iterator.hpp"
21 #include "constants.hpp"
22 #include "dense_hash_map.hpp"
23 #include "dense_hash_set.hpp"
24 #include "deque.hpp"
25 #include "json.hpp"
26 #include "map_iterator.hpp"
27 #include "result_iterator.hpp"
28 #include "result_response.hpp"
29 #include "row.hpp"
30 #include "string_ref.hpp"
31 #include "token_map.hpp"
32 #include "value.hpp"
33 #include "vector.hpp"
34 
35 #include <algorithm>
36 #include <assert.h>
37 #include <iomanip>
38 #include <ios>
39 #include <uv.h>
40 
41 #define CASS_NETWORK_TOPOLOGY_STRATEGY "NetworkTopologyStrategy"
42 #define CASS_SIMPLE_STRATEGY "SimpleStrategy"
43 
44 namespace std {
45 
46 template <>
47 struct equal_to<datastax::internal::core::Host::Ptr> {
operator ()std::equal_to48   bool operator()(const datastax::internal::core::Host::Ptr& lhs,
49                   const datastax::internal::core::Host::Ptr& rhs) const {
50     if (lhs == rhs) {
51       return true;
52     }
53     if (!lhs || !rhs) {
54       return false;
55     }
56     return lhs->address() == rhs->address();
57   }
58 };
59 
60 #if defined(HASH_IN_TR1) && !defined(_WIN32)
61 namespace tr1 {
62 #endif
63 
64 template <>
65 struct hash<datastax::internal::core::Host::Ptr> {
operator ()std::tr1::hash66   std::size_t operator()(const datastax::internal::core::Host::Ptr& host) const {
67     if (!host) return 0;
68     return hasher(host->address());
69   }
70   SPARSEHASH_HASH<datastax::internal::core::Address> hasher;
71 };
72 
73 #if defined(HASH_IN_TR1) && !defined(_WIN32)
74 } // namespace tr1
75 #endif
76 
77 } // namespace std
78 
79 namespace datastax { namespace internal { namespace core {
80 
81 class IdGenerator {
82 public:
83   typedef DenseHashMap<String, uint32_t> IdMap;
84 
85   static const uint32_t EMPTY_KEY;
86   static const uint32_t DELETED_KEY;
87 
IdGenerator()88   IdGenerator() { ids_.set_empty_key(String()); }
89 
get(const String & key)90   uint32_t get(const String& key) {
91     if (key.empty()) {
92       return 0;
93     }
94 
95     IdMap::const_iterator i = ids_.find(key);
96     if (i != ids_.end()) {
97       return i->second;
98     }
99 
100     // This will never generate a 0 identifier. So 0 can be used as
101     // inalid or empty.
102     uint32_t id = ids_.size() + 1;
103     ids_[key] = id;
104     return id;
105   }
106 
107 private:
108   IdMap ids_;
109 };
110 
111 struct Murmur3Partitioner {
112   typedef int64_t Token;
113 
114   static Token from_string(const StringRef& str);
115   static Token hash(const StringRef& str);
namedatastax::internal::core::Murmur3Partitioner116   static StringRef name() { return "Murmur3Partitioner"; }
117 };
118 
119 struct RandomPartitioner {
120   struct Token {
121     uint64_t hi;
122     uint64_t lo;
123 
operator <datastax::internal::core::RandomPartitioner::Token124     bool operator<(const Token& other) const {
125       return hi == other.hi ? lo < other.lo : hi < other.hi;
126     }
127 
operator ==datastax::internal::core::RandomPartitioner::Token128     bool operator==(const Token& other) const { return hi == other.hi && lo == other.lo; }
129   };
130 
131   static Token abs(Token token);
132   static uint64_t encode(uint8_t* bytes);
133 
134   static Token from_string(const StringRef& str);
135   static Token hash(const StringRef& str);
namedatastax::internal::core::RandomPartitioner136   static StringRef name() { return "RandomPartitioner"; }
137 };
138 
139 class ByteOrderedPartitioner {
140 public:
141   typedef Vector<uint8_t> Token;
142 
143   static Token from_string(const StringRef& str);
144   static Token hash(const StringRef& str);
name()145   static StringRef name() { return "ByteOrderedPartitioner"; }
146 };
147 
operator <<(std::ostream & os,const RandomPartitioner::Token & token)148 inline std::ostream& operator<<(std::ostream& os, const RandomPartitioner::Token& token) {
149   os << std::setfill('0') << std::setw(16) << std::hex << token.hi << std::setfill('0')
150      << std::setw(16) << std::hex << token.lo;
151   return os;
152 }
153 
operator <<(std::ostream & os,const ByteOrderedPartitioner::Token & token)154 inline std::ostream& operator<<(std::ostream& os, const ByteOrderedPartitioner::Token& token) {
155   for (ByteOrderedPartitioner::Token::const_iterator it = token.begin(), end = token.end();
156        it != end; ++it) {
157     os << std::hex << *it;
158   }
159   return os;
160 }
161 
162 class HostSet : public DenseHashSet<Host::Ptr> {
163 public:
HostSet()164   HostSet() {
165     set_empty_key(Host::Ptr(new Host(Address::EMPTY_KEY)));
166     set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
167   }
168 
169   template <class InputIterator>
HostSet(InputIterator first,InputIterator last)170   HostSet(InputIterator first, InputIterator last)
171       : DenseHashSet<Host::Ptr>(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) {
172     set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
173   }
174 };
175 
176 class RackSet : public DenseHashSet<uint32_t> {
177 public:
RackSet()178   RackSet() {
179     set_empty_key(IdGenerator::EMPTY_KEY);
180     set_deleted_key(IdGenerator::DELETED_KEY);
181   }
182 };
183 
184 struct Datacenter {
Datacenterdatastax::internal::core::Datacenter185   Datacenter()
186       : num_nodes(0) {}
187   size_t num_nodes;
188   RackSet racks;
189 };
190 
191 class DatacenterMap : public DenseHashMap<uint32_t, Datacenter> {
192 public:
DatacenterMap()193   DatacenterMap() {
194     set_empty_key(IdGenerator::EMPTY_KEY);
195     set_deleted_key(IdGenerator::DELETED_KEY);
196   }
197 };
198 
199 struct ReplicationFactor {
ReplicationFactordatastax::internal::core::ReplicationFactor200   ReplicationFactor()
201       : count(0) {}
202   size_t count;
203   String name; // Used for logging the datacenter name
operator ==datastax::internal::core::ReplicationFactor204   bool operator==(const ReplicationFactor& other) const {
205     return count == other.count && name == other.name;
206   }
207 };
208 
build_datacenters(const HostSet & hosts,DatacenterMap & result)209 inline void build_datacenters(const HostSet& hosts, DatacenterMap& result) {
210   result.clear();
211   for (HostSet::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) {
212     uint32_t dc = (*i)->dc_id();
213     uint32_t rack = (*i)->rack_id();
214     if (dc != 0 && rack != 0) {
215       Datacenter& datacenter = result[dc];
216       datacenter.racks.insert(rack);
217       datacenter.num_nodes++;
218     }
219   }
220 }
221 
222 class ReplicationFactorMap : public DenseHashMap<uint32_t, ReplicationFactor> {
223 public:
ReplicationFactorMap()224   ReplicationFactorMap() { set_empty_key(IdGenerator::EMPTY_KEY); }
225 };
226 
227 template <class Partitioner>
228 class ReplicationStrategy {
229 public:
230   typedef typename Partitioner::Token Token;
231 
232   typedef std::pair<Token, Host*> TokenHost;
233   typedef Vector<TokenHost> TokenHostVec;
234 
235   typedef std::pair<Token, CopyOnWriteHostVec> TokenReplicas;
236   typedef Vector<TokenReplicas> TokenReplicasVec;
237 
238   typedef Deque<typename TokenHostVec::const_iterator> TokenHostQueue;
239 
240   struct DatacenterRackInfo {
DatacenterRackInfodatastax::internal::core::ReplicationStrategy::DatacenterRackInfo241     DatacenterRackInfo()
242         : replica_count(0)
243         , replication_factor(0)
244         , rack_count(0) {}
245     size_t replica_count;
246     size_t replication_factor;
247     RackSet racks_observed;
248     size_t rack_count;
249     TokenHostQueue skipped_endpoints;
250   };
251 
252   class DatacenterRackInfoMap : public DenseHashMap<uint32_t, DatacenterRackInfo> {
253   public:
DatacenterRackInfoMap()254     DatacenterRackInfoMap() {
255       DenseHashMap<uint32_t, DatacenterRackInfo>::set_empty_key(IdGenerator::EMPTY_KEY);
256     }
257   };
258 
259   enum Type { NETWORK_TOPOLOGY_STRATEGY, SIMPLE_STRATEGY, NON_REPLICATED };
260 
ReplicationStrategy()261   ReplicationStrategy()
262       : type_(NON_REPLICATED) {}
263 
264   void init(IdGenerator& dc_ids, const VersionNumber& cassandra_version, const Row* row);
265 
operator !=(const ReplicationStrategy & other) const266   bool operator!=(const ReplicationStrategy& other) const {
267     return type_ != other.type_ || replication_factors_ != other.replication_factors_;
268   }
269 
270   void build_replicas(const TokenHostVec& tokens, const DatacenterMap& datacenters,
271                       TokenReplicasVec& result) const;
272 
273 private:
274   void build_replicas_network_topology(const TokenHostVec& tokens, const DatacenterMap& datacenters,
275                                        TokenReplicasVec& result) const;
276   void build_replicas_simple(const TokenHostVec& tokens, const DatacenterMap& datacenters,
277                              TokenReplicasVec& result) const;
278   void build_replicas_non_replicated(const TokenHostVec& tokens, const DatacenterMap& datacenters,
279                                      TokenReplicasVec& result) const;
280 
281 private:
282   Type type_;
283   ReplicationFactorMap replication_factors_;
284 };
285 
286 template <class Partitioner>
init(IdGenerator & dc_ids,const VersionNumber & cassandra_version,const Row * row)287 void ReplicationStrategy<Partitioner>::init(IdGenerator& dc_ids,
288                                             const VersionNumber& cassandra_version,
289                                             const Row* row) {
290   StringRef strategy_class;
291 
292   if (cassandra_version >= VersionNumber(3, 0, 0)) {
293     const Value* value = row->get_by_name("replication");
294     if (value && value->is_map() && is_string_type(value->primary_value_type()) &&
295         is_string_type(value->secondary_value_type())) {
296       MapIterator iterator(value);
297       while (iterator.next()) {
298         String key(iterator.key()->to_string());
299         if (key == "class") {
300           strategy_class = iterator.value()->to_string_ref();
301         } else {
302           String value(iterator.value()->to_string());
303           size_t count = strtoul(value.c_str(), NULL, 10);
304           if (count > 0) {
305             ReplicationFactor replication_factor;
306             replication_factor.count = count;
307             replication_factor.name = key;
308             if (key == "replication_factor") {
309               replication_factors_[1] = replication_factor;
310             } else {
311               replication_factors_[dc_ids.get(key)] = replication_factor;
312             }
313           } else {
314             LOG_WARN("Replication factor of 0 for option %s", key.c_str());
315           }
316         }
317       }
318     }
319   } else {
320     const Value* value;
321     value = row->get_by_name("strategy_class");
322     if (value && is_string_type(value->value_type())) {
323       strategy_class = value->to_string_ref();
324     }
325 
326     value = row->get_by_name("strategy_options");
327 
328     Vector<char> buf = value->decoder().as_vector();
329     json::Document d;
330     d.ParseInsitu(&buf[0]);
331 
332     if (!d.HasParseError() && d.IsObject()) {
333       for (json::Value::ConstMemberIterator i = d.MemberBegin(); i != d.MemberEnd(); ++i) {
334         String key(i->name.GetString(), i->name.GetStringLength());
335         String value(i->value.GetString(), i->value.GetStringLength());
336         size_t count = strtoul(value.c_str(), NULL, 10);
337         if (count > 0) {
338           ReplicationFactor replication_factor;
339           replication_factor.count = count;
340           replication_factor.name = key;
341           if (key == "replication_factor") {
342             replication_factors_[1] = replication_factor;
343           } else {
344             replication_factors_[dc_ids.get(key)] = replication_factor;
345           }
346         } else {
347           LOG_WARN("Replication factor of 0 for option %s", key.c_str());
348         }
349       }
350     }
351   }
352 
353   if (ends_with(strategy_class, CASS_NETWORK_TOPOLOGY_STRATEGY)) {
354     type_ = NETWORK_TOPOLOGY_STRATEGY;
355   } else if (ends_with(strategy_class, CASS_SIMPLE_STRATEGY)) {
356     type_ = SIMPLE_STRATEGY;
357   }
358 }
359 
360 template <class Partitioner>
build_replicas(const TokenHostVec & tokens,const DatacenterMap & datacenters,TokenReplicasVec & result) const361 void ReplicationStrategy<Partitioner>::build_replicas(const TokenHostVec& tokens,
362                                                       const DatacenterMap& datacenters,
363                                                       TokenReplicasVec& result) const {
364   result.clear();
365   result.reserve(tokens.size());
366 
367   switch (type_) {
368     case NETWORK_TOPOLOGY_STRATEGY:
369       build_replicas_network_topology(tokens, datacenters, result);
370       break;
371     case SIMPLE_STRATEGY:
372       build_replicas_simple(tokens, datacenters, result);
373       break;
374     default:
375       build_replicas_non_replicated(tokens, datacenters, result);
376       break;
377   }
378 }
379 
380 // Adds unique replica. It returns true if the replica was added.
add_replica(CopyOnWriteHostVec & hosts,const Host::Ptr & host)381 inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
382   for (HostVec::const_reverse_iterator it = hosts->rbegin(); it != hosts->rend(); ++it) {
383     if ((*it)->address() == host->address()) {
384       return false; // Already in the replica set
385     }
386   }
387   hosts->push_back(host);
388   return true;
389 }
390 
391 template <class Partitioner>
build_replicas_network_topology(const TokenHostVec & tokens,const DatacenterMap & datacenters,TokenReplicasVec & result) const392 void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
393     const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
394   if (replication_factors_.empty()) {
395     return;
396   }
397 
398   DatacenterRackInfoMap dc_racks;
399   dc_racks.resize(datacenters.size());
400 
401   size_t num_replicas = 0;
402 
403   // Populate the datacenter and rack information. Only considering valid
404   // datacenters that actually have hosts. If there's a replication factor
405   // for a datacenter that doesn't exist or has no node then it will not
406   // be counted.
407   for (ReplicationFactorMap::const_iterator i = replication_factors_.begin(),
408                                             end = replication_factors_.end();
409        i != end; ++i) {
410     DatacenterMap::const_iterator j = datacenters.find(i->first);
411     // Don't include datacenters that don't exist
412     if (j != datacenters.end()) {
413       // A replication factor cannot exceed the number of nodes in a datacenter
414       size_t replication_factor = std::min<size_t>(i->second.count, j->second.num_nodes);
415       num_replicas += replication_factor;
416       DatacenterRackInfo dc_rack_info;
417       dc_rack_info.replication_factor = replication_factor;
418       dc_rack_info.rack_count = j->second.racks.size();
419       dc_racks[j->first] = dc_rack_info;
420     } else {
421       LOG_WARN("No nodes in datacenter '%s'. Check your replication strategies.",
422                i->second.name.c_str());
423     }
424   }
425 
426   if (num_replicas == 0) {
427     return;
428   }
429 
430   for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
431        ++i) {
432     Token token = i->first;
433     typename TokenHostVec::const_iterator token_it = i;
434 
435     CopyOnWriteHostVec replicas(new HostVec());
436     replicas->reserve(num_replicas);
437 
438     // Clear datacenter and rack information for the next token
439     for (typename DatacenterRackInfoMap::iterator j = dc_racks.begin(), end = dc_racks.end();
440          j != end; ++j) {
441       j->second.replica_count = 0;
442       j->second.racks_observed.clear();
443       j->second.skipped_endpoints.clear();
444     }
445 
446     for (typename TokenHostVec::const_iterator j = tokens.begin(), end = tokens.end();
447          j != end && replicas->size() < num_replicas; ++j) {
448       typename TokenHostVec::const_iterator curr_token_it = token_it;
449       Host* host = curr_token_it->second;
450       uint32_t dc = host->dc_id();
451       uint32_t rack = host->rack_id();
452 
453       ++token_it;
454       if (token_it == tokens.end()) {
455         token_it = tokens.begin();
456       }
457 
458       typename DatacenterRackInfoMap::iterator dc_rack_it = dc_racks.find(dc);
459       if (dc_rack_it == dc_racks.end()) {
460         continue;
461       }
462 
463       DatacenterRackInfo& dc_rack_info = dc_rack_it->second;
464 
465       size_t& replica_count_this_dc = dc_rack_info.replica_count;
466       const size_t replication_factor = dc_rack_info.replication_factor;
467 
468       if (replica_count_this_dc >= replication_factor) {
469         continue;
470       }
471 
472       RackSet& racks_observed_this_dc = dc_rack_info.racks_observed;
473       const size_t rack_count_this_dc = dc_rack_info.rack_count;
474 
475       // First, attempt to distribute replicas over all possible racks in a
476       // datacenter only then consider hosts in the same rack
477 
478       if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
479         if (add_replica(replicas, Host::Ptr(host))) {
480           ++replica_count_this_dc;
481         }
482       } else {
483         TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
484         if (racks_observed_this_dc.count(rack) > 0) {
485           skipped_endpoints_this_dc.push_back(curr_token_it);
486         } else {
487           if (add_replica(replicas, Host::Ptr(host))) {
488             ++replica_count_this_dc;
489             racks_observed_this_dc.insert(rack);
490           }
491 
492           // Once we visited every rack in the current datacenter then starting considering
493           // hosts we've already skipped.
494           if (racks_observed_this_dc.size() == rack_count_this_dc) {
495             while (!skipped_endpoints_this_dc.empty() &&
496                    replica_count_this_dc < replication_factor) {
497               if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
498                 ++replica_count_this_dc;
499               }
500               skipped_endpoints_this_dc.pop_front();
501             }
502           }
503         }
504       }
505     }
506 
507     result.push_back(TokenReplicas(token, replicas));
508   }
509 }
510 
511 template <class Partitioner>
build_replicas_simple(const TokenHostVec & tokens,const DatacenterMap & not_used,TokenReplicasVec & result) const512 void ReplicationStrategy<Partitioner>::build_replicas_simple(const TokenHostVec& tokens,
513                                                              const DatacenterMap& not_used,
514                                                              TokenReplicasVec& result) const {
515   ReplicationFactorMap::const_iterator it = replication_factors_.find(1);
516   if (it == replication_factors_.end()) {
517     return;
518   }
519   const size_t num_tokens = tokens.size();
520   const size_t num_replicas = std::min<size_t>(it->second.count, num_tokens);
521   for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
522        ++i) {
523     CopyOnWriteHostVec replicas(new HostVec());
524     replicas->reserve(num_replicas);
525     typename TokenHostVec::const_iterator token_it = i;
526     for (size_t j = 0; j < num_tokens && replicas->size() < num_replicas; ++j) {
527       add_replica(replicas, Host::Ptr(Host::Ptr(token_it->second)));
528       ++token_it;
529       if (token_it == tokens.end()) {
530         token_it = tokens.begin();
531       }
532     }
533     result.push_back(TokenReplicas(i->first, replicas));
534   }
535 }
536 
537 template <class Partitioner>
build_replicas_non_replicated(const TokenHostVec & tokens,const DatacenterMap & not_used,TokenReplicasVec & result) const538 void ReplicationStrategy<Partitioner>::build_replicas_non_replicated(
539     const TokenHostVec& tokens, const DatacenterMap& not_used, TokenReplicasVec& result) const {
540   for (typename TokenHostVec::const_iterator i = tokens.begin(); i != tokens.end(); ++i) {
541     CopyOnWriteHostVec replicas(new HostVec(1, Host::Ptr(i->second)));
542     result.push_back(TokenReplicas(i->first, replicas));
543   }
544 }
545 
546 template <class Partitioner>
547 class TokenMapImpl : public TokenMap {
548 public:
549   typedef typename Partitioner::Token Token;
550 
551   typedef std::pair<Token, Host*> TokenHost;
552   typedef Vector<TokenHost> TokenHostVec;
553 
554   struct TokenHostCompare {
operator ()datastax::internal::core::TokenMapImpl::TokenHostCompare555     bool operator()(const TokenHost& lhs, const TokenHost& rhs) const {
556       return lhs.first < rhs.first;
557     }
558   };
559 
560   struct RemoveTokenHostIf {
RemoveTokenHostIfdatastax::internal::core::TokenMapImpl::RemoveTokenHostIf561     RemoveTokenHostIf(const Host::Ptr& host)
562         : host(host) {}
563 
operator ()datastax::internal::core::TokenMapImpl::RemoveTokenHostIf564     bool operator()(const TokenHost& token) const {
565       if (!token.second) {
566         return false;
567       }
568       return token.second->address() == host->address();
569     }
570 
571     const Host::Ptr& host;
572   };
573 
574   typedef std::pair<Token, CopyOnWriteHostVec> TokenReplicas;
575   typedef Vector<TokenReplicas> TokenReplicasVec;
576 
577   struct TokenReplicasCompare {
operator ()datastax::internal::core::TokenMapImpl::TokenReplicasCompare578     bool operator()(const TokenReplicas& lhs, const TokenReplicas& rhs) const {
579       return lhs.first < rhs.first;
580     }
581   };
582 
583   typedef DenseHashMap<String, TokenReplicasVec> KeyspaceReplicaMap;
584   typedef DenseHashMap<String, ReplicationStrategy<Partitioner> > KeyspaceStrategyMap;
585 
TokenMapImpl()586   TokenMapImpl()
587       : no_replicas_dummy_(NULL) {
588     replicas_.set_empty_key(String());
589     replicas_.set_deleted_key(String(1, '\0'));
590     strategies_.set_empty_key(String());
591     strategies_.set_deleted_key(String(1, '\0'));
592   }
593 
TokenMapImpl(const TokenMapImpl & other)594   TokenMapImpl(const TokenMapImpl& other)
595       : tokens_(other.tokens_)
596       , hosts_(other.hosts_)
597       , replicas_(other.replicas_)
598       , strategies_(other.strategies_)
599       , rack_ids_(other.rack_ids_)
600       , dc_ids_(other.dc_ids_)
601       , no_replicas_dummy_(NULL) {}
602 
603   virtual void add_host(const Host::Ptr& host);
604   virtual void update_host_and_build(const Host::Ptr& host);
605   virtual void remove_host_and_build(const Host::Ptr& host);
606 
607   virtual void add_keyspaces(const VersionNumber& cassandra_version, const ResultResponse* result);
608   virtual void update_keyspaces_and_build(const VersionNumber& cassandra_version,
609                                           const ResultResponse* result);
610   virtual void drop_keyspace(const String& keyspace_name);
611 
612   virtual void build();
613 
614   virtual TokenMap::Ptr copy() const;
615 
616   virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
617                                                  const String& routing_key) const;
618 
619   virtual String dump(const String& keyspace_name) const;
620 
621 public:
622   // Testing only
623 
contains(const Token & token) const624   bool contains(const Token& token) const {
625     for (typename TokenHostVec::const_iterator i = tokens_.begin(), end = tokens_.end(); i != end;
626          ++i) {
627       if (token == i->first) return true;
628     }
629     return false;
630   }
631 
632   const TokenReplicasVec& token_replicas(const String& keyspace_name) const;
633 
634 private:
635   void update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result,
636                        bool should_build_replicas);
637   void remove_host_tokens(const Host::Ptr& host);
638   void update_host_ids(const Host::Ptr& host);
639   void build_replicas();
640 
641 private:
642   TokenHostVec tokens_;
643   HostSet hosts_;
644   DatacenterMap datacenters_;
645   KeyspaceReplicaMap replicas_;
646   KeyspaceStrategyMap strategies_;
647   IdGenerator rack_ids_;
648   IdGenerator dc_ids_;
649   CopyOnWriteHostVec no_replicas_dummy_;
650 };
651 
652 template <class Partitioner>
add_host(const Host::Ptr & host)653 void TokenMapImpl<Partitioner>::add_host(const Host::Ptr& host) {
654   update_host_ids(host);
655   hosts_.insert(host);
656 
657   const Vector<String>& tokens(host->tokens());
658   for (Vector<String>::const_iterator it = tokens.begin(), end = tokens.end(); it != end; ++it) {
659     Token token = Partitioner::from_string(*it);
660     tokens_.push_back(TokenHost(token, host.get()));
661   }
662 }
663 
664 template <class Partitioner>
update_host_and_build(const Host::Ptr & host)665 void TokenMapImpl<Partitioner>::update_host_and_build(const Host::Ptr& host) {
666   uint64_t start = uv_hrtime();
667   remove_host_tokens(host);
668 
669   update_host_ids(host);
670   hosts_.insert(host);
671 
672   TokenHostVec new_tokens;
673   const Vector<String>& tokens(host->tokens());
674   for (Vector<String>::const_iterator it = tokens.begin(), end = tokens.end(); it != end; ++it) {
675     Token token = Partitioner::from_string(*it);
676     new_tokens.push_back(TokenHost(token, host.get()));
677   }
678 
679   std::sort(new_tokens.begin(), new_tokens.end());
680 
681   TokenHostVec merged(tokens_.size() + new_tokens.size());
682   std::merge(tokens_.begin(), tokens_.end(), new_tokens.begin(), new_tokens.end(), merged.begin(),
683              TokenHostCompare());
684   tokens_ = merged;
685 
686   build_replicas();
687   LOG_DEBUG("Updated token map with host %s (%u tokens). Rebuilt token map with %u hosts and %u "
688             "tokens in %f ms",
689             host->address_string().c_str(), (unsigned int)new_tokens.size(),
690             (unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
691             (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
692 }
693 
694 template <class Partitioner>
remove_host_and_build(const Host::Ptr & host)695 void TokenMapImpl<Partitioner>::remove_host_and_build(const Host::Ptr& host) {
696   if (hosts_.find(host) == hosts_.end()) return;
697   uint64_t start = uv_hrtime();
698   remove_host_tokens(host);
699   hosts_.erase(host);
700   build_replicas();
701   LOG_DEBUG(
702       "Removed host %s from token map. Rebuilt token map with %u hosts and %u tokens in %f ms",
703       host->address_string().c_str(), (unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
704       (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
705 }
706 
707 template <class Partitioner>
add_keyspaces(const VersionNumber & cassandra_version,const ResultResponse * result)708 void TokenMapImpl<Partitioner>::add_keyspaces(const VersionNumber& cassandra_version,
709                                               const ResultResponse* result) {
710   update_keyspace(cassandra_version, result, false);
711 }
712 
713 template <class Partitioner>
update_keyspaces_and_build(const VersionNumber & cassandra_version,const ResultResponse * result)714 void TokenMapImpl<Partitioner>::update_keyspaces_and_build(const VersionNumber& cassandra_version,
715                                                            const ResultResponse* result) {
716   update_keyspace(cassandra_version, result, true);
717 }
718 
719 template <class Partitioner>
drop_keyspace(const String & keyspace_name)720 void TokenMapImpl<Partitioner>::drop_keyspace(const String& keyspace_name) {
721   replicas_.erase(keyspace_name);
722   strategies_.erase(keyspace_name);
723 }
724 
725 template <class Partitioner>
build()726 void TokenMapImpl<Partitioner>::build() {
727   uint64_t start = uv_hrtime();
728   std::sort(tokens_.begin(), tokens_.end());
729   build_replicas();
730   LOG_DEBUG("Built token map with %u hosts and %u tokens in %f ms", (unsigned int)hosts_.size(),
731             (unsigned int)tokens_.size(), (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
732 }
733 
734 template <class Partitioner>
copy() const735 TokenMap::Ptr TokenMapImpl<Partitioner>::copy() const {
736   return Ptr(new TokenMapImpl<Partitioner>(*this));
737 }
738 
739 template <class Partitioner>
get_replicas(const String & keyspace_name,const String & routing_key) const740 const CopyOnWriteHostVec& TokenMapImpl<Partitioner>::get_replicas(const String& keyspace_name,
741                                                                   const String& routing_key) const {
742   typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
743 
744   if (ks_it != replicas_.end()) {
745     Token token = Partitioner::hash(routing_key);
746     const TokenReplicasVec& replicas = ks_it->second;
747     typename TokenReplicasVec::const_iterator replicas_it =
748         std::upper_bound(replicas.begin(), replicas.end(), TokenReplicas(token, no_replicas_dummy_),
749                          TokenReplicasCompare());
750     if (replicas_it != replicas.end()) {
751       return replicas_it->second;
752     } else if (!replicas.empty()) {
753       return replicas.front().second;
754     }
755   }
756 
757   return no_replicas_dummy_;
758 }
759 
760 template <class Partitioner>
dump(const String & keyspace_name) const761 String TokenMapImpl<Partitioner>::dump(const String& keyspace_name) const {
762   String result;
763   typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
764   const TokenReplicasVec& replicas = ks_it->second;
765 
766   for (typename TokenReplicasVec::const_iterator it = replicas.begin(), end = replicas.end();
767        it != end; ++it) {
768     OStringStream ss;
769     ss << std::setw(20) << it->first << " [ ";
770     const CopyOnWriteHostVec& hosts = it->second;
771     for (HostVec::const_iterator host_it = hosts->begin(), end = hosts->end(); host_it != end;
772          ++host_it) {
773       ss << (*host_it)->address_string() << " ";
774     }
775     ss << "]\n";
776     result.append(ss.str());
777   }
778   return result;
779 }
780 
781 template <class Partitioner>
782 const typename TokenMapImpl<Partitioner>::TokenReplicasVec&
token_replicas(const String & keyspace_name) const783 TokenMapImpl<Partitioner>::token_replicas(const String& keyspace_name) const {
784   typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
785   static TokenReplicasVec not_found;
786   return ks_it != replicas_.end() ? ks_it->second : not_found;
787 }
788 
789 template <class Partitioner>
update_keyspace(const VersionNumber & cassandra_version,const ResultResponse * result,bool should_build_replicas)790 void TokenMapImpl<Partitioner>::update_keyspace(const VersionNumber& cassandra_version,
791                                                 const ResultResponse* result,
792                                                 bool should_build_replicas) {
793   ResultIterator rows(result);
794 
795   while (rows.next()) {
796     String keyspace_name;
797     const Row* row = rows.row();
798 
799     if (!row->get_string_by_name("keyspace_name", &keyspace_name)) {
800       LOG_ERROR("Unable to get column value for 'keyspace_name'");
801       continue;
802     }
803 
804     ReplicationStrategy<Partitioner> strategy;
805 
806     strategy.init(dc_ids_, cassandra_version, row);
807 
808     typename KeyspaceStrategyMap::iterator i = strategies_.find(keyspace_name);
809     if (i == strategies_.end() || i->second != strategy) {
810       if (i == strategies_.end()) {
811         strategies_[keyspace_name] = strategy;
812       } else {
813         i->second = strategy;
814       }
815       if (should_build_replicas) {
816         uint64_t start = uv_hrtime();
817         build_datacenters(hosts_, datacenters_);
818         strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
819         LOG_DEBUG("Updated token map with keyspace '%s'. Rebuilt token map with %u hosts and %u "
820                   "tokens in %f ms",
821                   keyspace_name.c_str(), (unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
822                   (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
823       }
824     }
825   }
826 }
827 
828 template <class Partitioner>
remove_host_tokens(const Host::Ptr & host)829 void TokenMapImpl<Partitioner>::remove_host_tokens(const Host::Ptr& host) {
830   typename TokenHostVec::iterator last =
831       std::remove_copy_if(tokens_.begin(), tokens_.end(), tokens_.begin(), RemoveTokenHostIf(host));
832   tokens_.resize(last - tokens_.begin());
833 }
834 
835 template <class Partitioner>
update_host_ids(const Host::Ptr & host)836 void TokenMapImpl<Partitioner>::update_host_ids(const Host::Ptr& host) {
837   host->set_rack_and_dc_ids(rack_ids_.get(host->rack()), dc_ids_.get(host->dc()));
838 }
839 
840 template <class Partitioner>
build_replicas()841 void TokenMapImpl<Partitioner>::build_replicas() {
842   build_datacenters(hosts_, datacenters_);
843   for (typename KeyspaceStrategyMap::const_iterator i = strategies_.begin(),
844                                                     end = strategies_.end();
845        i != end; ++i) {
846     const String& keyspace_name = i->first;
847     const ReplicationStrategy<Partitioner>& strategy = i->second;
848     strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
849     LOG_TRACE("Replicas for keyspace '%s':\n%s", keyspace_name.c_str(),
850               dump(keyspace_name).c_str());
851   }
852 }
853 
854 }}} // namespace datastax::internal::core
855 
856 #endif
857