1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef DATASTAX_INTERNAL_TOKEN_MAP_IMPL_HPP
18 #define DATASTAX_INTERNAL_TOKEN_MAP_IMPL_HPP
19
20 #include "collection_iterator.hpp"
21 #include "constants.hpp"
22 #include "dense_hash_map.hpp"
23 #include "dense_hash_set.hpp"
24 #include "deque.hpp"
25 #include "json.hpp"
26 #include "map_iterator.hpp"
27 #include "result_iterator.hpp"
28 #include "result_response.hpp"
29 #include "row.hpp"
30 #include "string_ref.hpp"
31 #include "token_map.hpp"
32 #include "value.hpp"
33 #include "vector.hpp"
34
35 #include <algorithm>
36 #include <assert.h>
37 #include <iomanip>
38 #include <ios>
39 #include <uv.h>
40
41 #define CASS_NETWORK_TOPOLOGY_STRATEGY "NetworkTopologyStrategy"
42 #define CASS_SIMPLE_STRATEGY "SimpleStrategy"
43
44 namespace std {
45
46 template <>
47 struct equal_to<datastax::internal::core::Host::Ptr> {
operator ()std::equal_to48 bool operator()(const datastax::internal::core::Host::Ptr& lhs,
49 const datastax::internal::core::Host::Ptr& rhs) const {
50 if (lhs == rhs) {
51 return true;
52 }
53 if (!lhs || !rhs) {
54 return false;
55 }
56 return lhs->address() == rhs->address();
57 }
58 };
59
60 #if defined(HASH_IN_TR1) && !defined(_WIN32)
61 namespace tr1 {
62 #endif
63
64 template <>
65 struct hash<datastax::internal::core::Host::Ptr> {
operator ()std::tr1::hash66 std::size_t operator()(const datastax::internal::core::Host::Ptr& host) const {
67 if (!host) return 0;
68 return hasher(host->address());
69 }
70 SPARSEHASH_HASH<datastax::internal::core::Address> hasher;
71 };
72
73 #if defined(HASH_IN_TR1) && !defined(_WIN32)
74 } // namespace tr1
75 #endif
76
77 } // namespace std
78
79 namespace datastax { namespace internal { namespace core {
80
81 class IdGenerator {
82 public:
83 typedef DenseHashMap<String, uint32_t> IdMap;
84
85 static const uint32_t EMPTY_KEY;
86 static const uint32_t DELETED_KEY;
87
IdGenerator()88 IdGenerator() { ids_.set_empty_key(String()); }
89
get(const String & key)90 uint32_t get(const String& key) {
91 if (key.empty()) {
92 return 0;
93 }
94
95 IdMap::const_iterator i = ids_.find(key);
96 if (i != ids_.end()) {
97 return i->second;
98 }
99
100 // This will never generate a 0 identifier. So 0 can be used as
101 // inalid or empty.
102 uint32_t id = ids_.size() + 1;
103 ids_[key] = id;
104 return id;
105 }
106
107 private:
108 IdMap ids_;
109 };
110
111 struct Murmur3Partitioner {
112 typedef int64_t Token;
113
114 static Token from_string(const StringRef& str);
115 static Token hash(const StringRef& str);
namedatastax::internal::core::Murmur3Partitioner116 static StringRef name() { return "Murmur3Partitioner"; }
117 };
118
119 struct RandomPartitioner {
120 struct Token {
121 uint64_t hi;
122 uint64_t lo;
123
operator <datastax::internal::core::RandomPartitioner::Token124 bool operator<(const Token& other) const {
125 return hi == other.hi ? lo < other.lo : hi < other.hi;
126 }
127
operator ==datastax::internal::core::RandomPartitioner::Token128 bool operator==(const Token& other) const { return hi == other.hi && lo == other.lo; }
129 };
130
131 static Token abs(Token token);
132 static uint64_t encode(uint8_t* bytes);
133
134 static Token from_string(const StringRef& str);
135 static Token hash(const StringRef& str);
namedatastax::internal::core::RandomPartitioner136 static StringRef name() { return "RandomPartitioner"; }
137 };
138
139 class ByteOrderedPartitioner {
140 public:
141 typedef Vector<uint8_t> Token;
142
143 static Token from_string(const StringRef& str);
144 static Token hash(const StringRef& str);
name()145 static StringRef name() { return "ByteOrderedPartitioner"; }
146 };
147
operator <<(std::ostream & os,const RandomPartitioner::Token & token)148 inline std::ostream& operator<<(std::ostream& os, const RandomPartitioner::Token& token) {
149 os << std::setfill('0') << std::setw(16) << std::hex << token.hi << std::setfill('0')
150 << std::setw(16) << std::hex << token.lo;
151 return os;
152 }
153
operator <<(std::ostream & os,const ByteOrderedPartitioner::Token & token)154 inline std::ostream& operator<<(std::ostream& os, const ByteOrderedPartitioner::Token& token) {
155 for (ByteOrderedPartitioner::Token::const_iterator it = token.begin(), end = token.end();
156 it != end; ++it) {
157 os << std::hex << *it;
158 }
159 return os;
160 }
161
162 class HostSet : public DenseHashSet<Host::Ptr> {
163 public:
HostSet()164 HostSet() {
165 set_empty_key(Host::Ptr(new Host(Address::EMPTY_KEY)));
166 set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
167 }
168
169 template <class InputIterator>
HostSet(InputIterator first,InputIterator last)170 HostSet(InputIterator first, InputIterator last)
171 : DenseHashSet<Host::Ptr>(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) {
172 set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
173 }
174 };
175
176 class RackSet : public DenseHashSet<uint32_t> {
177 public:
RackSet()178 RackSet() {
179 set_empty_key(IdGenerator::EMPTY_KEY);
180 set_deleted_key(IdGenerator::DELETED_KEY);
181 }
182 };
183
184 struct Datacenter {
Datacenterdatastax::internal::core::Datacenter185 Datacenter()
186 : num_nodes(0) {}
187 size_t num_nodes;
188 RackSet racks;
189 };
190
191 class DatacenterMap : public DenseHashMap<uint32_t, Datacenter> {
192 public:
DatacenterMap()193 DatacenterMap() {
194 set_empty_key(IdGenerator::EMPTY_KEY);
195 set_deleted_key(IdGenerator::DELETED_KEY);
196 }
197 };
198
199 struct ReplicationFactor {
ReplicationFactordatastax::internal::core::ReplicationFactor200 ReplicationFactor()
201 : count(0) {}
202 size_t count;
203 String name; // Used for logging the datacenter name
operator ==datastax::internal::core::ReplicationFactor204 bool operator==(const ReplicationFactor& other) const {
205 return count == other.count && name == other.name;
206 }
207 };
208
build_datacenters(const HostSet & hosts,DatacenterMap & result)209 inline void build_datacenters(const HostSet& hosts, DatacenterMap& result) {
210 result.clear();
211 for (HostSet::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) {
212 uint32_t dc = (*i)->dc_id();
213 uint32_t rack = (*i)->rack_id();
214 if (dc != 0 && rack != 0) {
215 Datacenter& datacenter = result[dc];
216 datacenter.racks.insert(rack);
217 datacenter.num_nodes++;
218 }
219 }
220 }
221
222 class ReplicationFactorMap : public DenseHashMap<uint32_t, ReplicationFactor> {
223 public:
ReplicationFactorMap()224 ReplicationFactorMap() { set_empty_key(IdGenerator::EMPTY_KEY); }
225 };
226
227 template <class Partitioner>
228 class ReplicationStrategy {
229 public:
230 typedef typename Partitioner::Token Token;
231
232 typedef std::pair<Token, Host*> TokenHost;
233 typedef Vector<TokenHost> TokenHostVec;
234
235 typedef std::pair<Token, CopyOnWriteHostVec> TokenReplicas;
236 typedef Vector<TokenReplicas> TokenReplicasVec;
237
238 typedef Deque<typename TokenHostVec::const_iterator> TokenHostQueue;
239
240 struct DatacenterRackInfo {
DatacenterRackInfodatastax::internal::core::ReplicationStrategy::DatacenterRackInfo241 DatacenterRackInfo()
242 : replica_count(0)
243 , replication_factor(0)
244 , rack_count(0) {}
245 size_t replica_count;
246 size_t replication_factor;
247 RackSet racks_observed;
248 size_t rack_count;
249 TokenHostQueue skipped_endpoints;
250 };
251
252 class DatacenterRackInfoMap : public DenseHashMap<uint32_t, DatacenterRackInfo> {
253 public:
DatacenterRackInfoMap()254 DatacenterRackInfoMap() {
255 DenseHashMap<uint32_t, DatacenterRackInfo>::set_empty_key(IdGenerator::EMPTY_KEY);
256 }
257 };
258
259 enum Type { NETWORK_TOPOLOGY_STRATEGY, SIMPLE_STRATEGY, NON_REPLICATED };
260
ReplicationStrategy()261 ReplicationStrategy()
262 : type_(NON_REPLICATED) {}
263
264 void init(IdGenerator& dc_ids, const VersionNumber& cassandra_version, const Row* row);
265
operator !=(const ReplicationStrategy & other) const266 bool operator!=(const ReplicationStrategy& other) const {
267 return type_ != other.type_ || replication_factors_ != other.replication_factors_;
268 }
269
270 void build_replicas(const TokenHostVec& tokens, const DatacenterMap& datacenters,
271 TokenReplicasVec& result) const;
272
273 private:
274 void build_replicas_network_topology(const TokenHostVec& tokens, const DatacenterMap& datacenters,
275 TokenReplicasVec& result) const;
276 void build_replicas_simple(const TokenHostVec& tokens, const DatacenterMap& datacenters,
277 TokenReplicasVec& result) const;
278 void build_replicas_non_replicated(const TokenHostVec& tokens, const DatacenterMap& datacenters,
279 TokenReplicasVec& result) const;
280
281 private:
282 Type type_;
283 ReplicationFactorMap replication_factors_;
284 };
285
286 template <class Partitioner>
init(IdGenerator & dc_ids,const VersionNumber & cassandra_version,const Row * row)287 void ReplicationStrategy<Partitioner>::init(IdGenerator& dc_ids,
288 const VersionNumber& cassandra_version,
289 const Row* row) {
290 StringRef strategy_class;
291
292 if (cassandra_version >= VersionNumber(3, 0, 0)) {
293 const Value* value = row->get_by_name("replication");
294 if (value && value->is_map() && is_string_type(value->primary_value_type()) &&
295 is_string_type(value->secondary_value_type())) {
296 MapIterator iterator(value);
297 while (iterator.next()) {
298 String key(iterator.key()->to_string());
299 if (key == "class") {
300 strategy_class = iterator.value()->to_string_ref();
301 } else {
302 String value(iterator.value()->to_string());
303 size_t count = strtoul(value.c_str(), NULL, 10);
304 if (count > 0) {
305 ReplicationFactor replication_factor;
306 replication_factor.count = count;
307 replication_factor.name = key;
308 if (key == "replication_factor") {
309 replication_factors_[1] = replication_factor;
310 } else {
311 replication_factors_[dc_ids.get(key)] = replication_factor;
312 }
313 } else {
314 LOG_WARN("Replication factor of 0 for option %s", key.c_str());
315 }
316 }
317 }
318 }
319 } else {
320 const Value* value;
321 value = row->get_by_name("strategy_class");
322 if (value && is_string_type(value->value_type())) {
323 strategy_class = value->to_string_ref();
324 }
325
326 value = row->get_by_name("strategy_options");
327
328 Vector<char> buf = value->decoder().as_vector();
329 json::Document d;
330 d.ParseInsitu(&buf[0]);
331
332 if (!d.HasParseError() && d.IsObject()) {
333 for (json::Value::ConstMemberIterator i = d.MemberBegin(); i != d.MemberEnd(); ++i) {
334 String key(i->name.GetString(), i->name.GetStringLength());
335 String value(i->value.GetString(), i->value.GetStringLength());
336 size_t count = strtoul(value.c_str(), NULL, 10);
337 if (count > 0) {
338 ReplicationFactor replication_factor;
339 replication_factor.count = count;
340 replication_factor.name = key;
341 if (key == "replication_factor") {
342 replication_factors_[1] = replication_factor;
343 } else {
344 replication_factors_[dc_ids.get(key)] = replication_factor;
345 }
346 } else {
347 LOG_WARN("Replication factor of 0 for option %s", key.c_str());
348 }
349 }
350 }
351 }
352
353 if (ends_with(strategy_class, CASS_NETWORK_TOPOLOGY_STRATEGY)) {
354 type_ = NETWORK_TOPOLOGY_STRATEGY;
355 } else if (ends_with(strategy_class, CASS_SIMPLE_STRATEGY)) {
356 type_ = SIMPLE_STRATEGY;
357 }
358 }
359
360 template <class Partitioner>
build_replicas(const TokenHostVec & tokens,const DatacenterMap & datacenters,TokenReplicasVec & result) const361 void ReplicationStrategy<Partitioner>::build_replicas(const TokenHostVec& tokens,
362 const DatacenterMap& datacenters,
363 TokenReplicasVec& result) const {
364 result.clear();
365 result.reserve(tokens.size());
366
367 switch (type_) {
368 case NETWORK_TOPOLOGY_STRATEGY:
369 build_replicas_network_topology(tokens, datacenters, result);
370 break;
371 case SIMPLE_STRATEGY:
372 build_replicas_simple(tokens, datacenters, result);
373 break;
374 default:
375 build_replicas_non_replicated(tokens, datacenters, result);
376 break;
377 }
378 }
379
380 // Adds unique replica. It returns true if the replica was added.
add_replica(CopyOnWriteHostVec & hosts,const Host::Ptr & host)381 inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
382 for (HostVec::const_reverse_iterator it = hosts->rbegin(); it != hosts->rend(); ++it) {
383 if ((*it)->address() == host->address()) {
384 return false; // Already in the replica set
385 }
386 }
387 hosts->push_back(host);
388 return true;
389 }
390
391 template <class Partitioner>
build_replicas_network_topology(const TokenHostVec & tokens,const DatacenterMap & datacenters,TokenReplicasVec & result) const392 void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
393 const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
394 if (replication_factors_.empty()) {
395 return;
396 }
397
398 DatacenterRackInfoMap dc_racks;
399 dc_racks.resize(datacenters.size());
400
401 size_t num_replicas = 0;
402
403 // Populate the datacenter and rack information. Only considering valid
404 // datacenters that actually have hosts. If there's a replication factor
405 // for a datacenter that doesn't exist or has no node then it will not
406 // be counted.
407 for (ReplicationFactorMap::const_iterator i = replication_factors_.begin(),
408 end = replication_factors_.end();
409 i != end; ++i) {
410 DatacenterMap::const_iterator j = datacenters.find(i->first);
411 // Don't include datacenters that don't exist
412 if (j != datacenters.end()) {
413 // A replication factor cannot exceed the number of nodes in a datacenter
414 size_t replication_factor = std::min<size_t>(i->second.count, j->second.num_nodes);
415 num_replicas += replication_factor;
416 DatacenterRackInfo dc_rack_info;
417 dc_rack_info.replication_factor = replication_factor;
418 dc_rack_info.rack_count = j->second.racks.size();
419 dc_racks[j->first] = dc_rack_info;
420 } else {
421 LOG_WARN("No nodes in datacenter '%s'. Check your replication strategies.",
422 i->second.name.c_str());
423 }
424 }
425
426 if (num_replicas == 0) {
427 return;
428 }
429
430 for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
431 ++i) {
432 Token token = i->first;
433 typename TokenHostVec::const_iterator token_it = i;
434
435 CopyOnWriteHostVec replicas(new HostVec());
436 replicas->reserve(num_replicas);
437
438 // Clear datacenter and rack information for the next token
439 for (typename DatacenterRackInfoMap::iterator j = dc_racks.begin(), end = dc_racks.end();
440 j != end; ++j) {
441 j->second.replica_count = 0;
442 j->second.racks_observed.clear();
443 j->second.skipped_endpoints.clear();
444 }
445
446 for (typename TokenHostVec::const_iterator j = tokens.begin(), end = tokens.end();
447 j != end && replicas->size() < num_replicas; ++j) {
448 typename TokenHostVec::const_iterator curr_token_it = token_it;
449 Host* host = curr_token_it->second;
450 uint32_t dc = host->dc_id();
451 uint32_t rack = host->rack_id();
452
453 ++token_it;
454 if (token_it == tokens.end()) {
455 token_it = tokens.begin();
456 }
457
458 typename DatacenterRackInfoMap::iterator dc_rack_it = dc_racks.find(dc);
459 if (dc_rack_it == dc_racks.end()) {
460 continue;
461 }
462
463 DatacenterRackInfo& dc_rack_info = dc_rack_it->second;
464
465 size_t& replica_count_this_dc = dc_rack_info.replica_count;
466 const size_t replication_factor = dc_rack_info.replication_factor;
467
468 if (replica_count_this_dc >= replication_factor) {
469 continue;
470 }
471
472 RackSet& racks_observed_this_dc = dc_rack_info.racks_observed;
473 const size_t rack_count_this_dc = dc_rack_info.rack_count;
474
475 // First, attempt to distribute replicas over all possible racks in a
476 // datacenter only then consider hosts in the same rack
477
478 if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
479 if (add_replica(replicas, Host::Ptr(host))) {
480 ++replica_count_this_dc;
481 }
482 } else {
483 TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
484 if (racks_observed_this_dc.count(rack) > 0) {
485 skipped_endpoints_this_dc.push_back(curr_token_it);
486 } else {
487 if (add_replica(replicas, Host::Ptr(host))) {
488 ++replica_count_this_dc;
489 racks_observed_this_dc.insert(rack);
490 }
491
492 // Once we visited every rack in the current datacenter then starting considering
493 // hosts we've already skipped.
494 if (racks_observed_this_dc.size() == rack_count_this_dc) {
495 while (!skipped_endpoints_this_dc.empty() &&
496 replica_count_this_dc < replication_factor) {
497 if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
498 ++replica_count_this_dc;
499 }
500 skipped_endpoints_this_dc.pop_front();
501 }
502 }
503 }
504 }
505 }
506
507 result.push_back(TokenReplicas(token, replicas));
508 }
509 }
510
511 template <class Partitioner>
build_replicas_simple(const TokenHostVec & tokens,const DatacenterMap & not_used,TokenReplicasVec & result) const512 void ReplicationStrategy<Partitioner>::build_replicas_simple(const TokenHostVec& tokens,
513 const DatacenterMap& not_used,
514 TokenReplicasVec& result) const {
515 ReplicationFactorMap::const_iterator it = replication_factors_.find(1);
516 if (it == replication_factors_.end()) {
517 return;
518 }
519 const size_t num_tokens = tokens.size();
520 const size_t num_replicas = std::min<size_t>(it->second.count, num_tokens);
521 for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
522 ++i) {
523 CopyOnWriteHostVec replicas(new HostVec());
524 replicas->reserve(num_replicas);
525 typename TokenHostVec::const_iterator token_it = i;
526 for (size_t j = 0; j < num_tokens && replicas->size() < num_replicas; ++j) {
527 add_replica(replicas, Host::Ptr(Host::Ptr(token_it->second)));
528 ++token_it;
529 if (token_it == tokens.end()) {
530 token_it = tokens.begin();
531 }
532 }
533 result.push_back(TokenReplicas(i->first, replicas));
534 }
535 }
536
537 template <class Partitioner>
build_replicas_non_replicated(const TokenHostVec & tokens,const DatacenterMap & not_used,TokenReplicasVec & result) const538 void ReplicationStrategy<Partitioner>::build_replicas_non_replicated(
539 const TokenHostVec& tokens, const DatacenterMap& not_used, TokenReplicasVec& result) const {
540 for (typename TokenHostVec::const_iterator i = tokens.begin(); i != tokens.end(); ++i) {
541 CopyOnWriteHostVec replicas(new HostVec(1, Host::Ptr(i->second)));
542 result.push_back(TokenReplicas(i->first, replicas));
543 }
544 }
545
546 template <class Partitioner>
547 class TokenMapImpl : public TokenMap {
548 public:
549 typedef typename Partitioner::Token Token;
550
551 typedef std::pair<Token, Host*> TokenHost;
552 typedef Vector<TokenHost> TokenHostVec;
553
554 struct TokenHostCompare {
operator ()datastax::internal::core::TokenMapImpl::TokenHostCompare555 bool operator()(const TokenHost& lhs, const TokenHost& rhs) const {
556 return lhs.first < rhs.first;
557 }
558 };
559
560 struct RemoveTokenHostIf {
RemoveTokenHostIfdatastax::internal::core::TokenMapImpl::RemoveTokenHostIf561 RemoveTokenHostIf(const Host::Ptr& host)
562 : host(host) {}
563
operator ()datastax::internal::core::TokenMapImpl::RemoveTokenHostIf564 bool operator()(const TokenHost& token) const {
565 if (!token.second) {
566 return false;
567 }
568 return token.second->address() == host->address();
569 }
570
571 const Host::Ptr& host;
572 };
573
574 typedef std::pair<Token, CopyOnWriteHostVec> TokenReplicas;
575 typedef Vector<TokenReplicas> TokenReplicasVec;
576
577 struct TokenReplicasCompare {
operator ()datastax::internal::core::TokenMapImpl::TokenReplicasCompare578 bool operator()(const TokenReplicas& lhs, const TokenReplicas& rhs) const {
579 return lhs.first < rhs.first;
580 }
581 };
582
583 typedef DenseHashMap<String, TokenReplicasVec> KeyspaceReplicaMap;
584 typedef DenseHashMap<String, ReplicationStrategy<Partitioner> > KeyspaceStrategyMap;
585
TokenMapImpl()586 TokenMapImpl()
587 : no_replicas_dummy_(NULL) {
588 replicas_.set_empty_key(String());
589 replicas_.set_deleted_key(String(1, '\0'));
590 strategies_.set_empty_key(String());
591 strategies_.set_deleted_key(String(1, '\0'));
592 }
593
TokenMapImpl(const TokenMapImpl & other)594 TokenMapImpl(const TokenMapImpl& other)
595 : tokens_(other.tokens_)
596 , hosts_(other.hosts_)
597 , replicas_(other.replicas_)
598 , strategies_(other.strategies_)
599 , rack_ids_(other.rack_ids_)
600 , dc_ids_(other.dc_ids_)
601 , no_replicas_dummy_(NULL) {}
602
603 virtual void add_host(const Host::Ptr& host);
604 virtual void update_host_and_build(const Host::Ptr& host);
605 virtual void remove_host_and_build(const Host::Ptr& host);
606
607 virtual void add_keyspaces(const VersionNumber& cassandra_version, const ResultResponse* result);
608 virtual void update_keyspaces_and_build(const VersionNumber& cassandra_version,
609 const ResultResponse* result);
610 virtual void drop_keyspace(const String& keyspace_name);
611
612 virtual void build();
613
614 virtual TokenMap::Ptr copy() const;
615
616 virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
617 const String& routing_key) const;
618
619 virtual String dump(const String& keyspace_name) const;
620
621 public:
622 // Testing only
623
contains(const Token & token) const624 bool contains(const Token& token) const {
625 for (typename TokenHostVec::const_iterator i = tokens_.begin(), end = tokens_.end(); i != end;
626 ++i) {
627 if (token == i->first) return true;
628 }
629 return false;
630 }
631
632 const TokenReplicasVec& token_replicas(const String& keyspace_name) const;
633
634 private:
635 void update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result,
636 bool should_build_replicas);
637 void remove_host_tokens(const Host::Ptr& host);
638 void update_host_ids(const Host::Ptr& host);
639 void build_replicas();
640
641 private:
642 TokenHostVec tokens_;
643 HostSet hosts_;
644 DatacenterMap datacenters_;
645 KeyspaceReplicaMap replicas_;
646 KeyspaceStrategyMap strategies_;
647 IdGenerator rack_ids_;
648 IdGenerator dc_ids_;
649 CopyOnWriteHostVec no_replicas_dummy_;
650 };
651
652 template <class Partitioner>
add_host(const Host::Ptr & host)653 void TokenMapImpl<Partitioner>::add_host(const Host::Ptr& host) {
654 update_host_ids(host);
655 hosts_.insert(host);
656
657 const Vector<String>& tokens(host->tokens());
658 for (Vector<String>::const_iterator it = tokens.begin(), end = tokens.end(); it != end; ++it) {
659 Token token = Partitioner::from_string(*it);
660 tokens_.push_back(TokenHost(token, host.get()));
661 }
662 }
663
664 template <class Partitioner>
update_host_and_build(const Host::Ptr & host)665 void TokenMapImpl<Partitioner>::update_host_and_build(const Host::Ptr& host) {
666 uint64_t start = uv_hrtime();
667 remove_host_tokens(host);
668
669 update_host_ids(host);
670 hosts_.insert(host);
671
672 TokenHostVec new_tokens;
673 const Vector<String>& tokens(host->tokens());
674 for (Vector<String>::const_iterator it = tokens.begin(), end = tokens.end(); it != end; ++it) {
675 Token token = Partitioner::from_string(*it);
676 new_tokens.push_back(TokenHost(token, host.get()));
677 }
678
679 std::sort(new_tokens.begin(), new_tokens.end());
680
681 TokenHostVec merged(tokens_.size() + new_tokens.size());
682 std::merge(tokens_.begin(), tokens_.end(), new_tokens.begin(), new_tokens.end(), merged.begin(),
683 TokenHostCompare());
684 tokens_ = merged;
685
686 build_replicas();
687 LOG_DEBUG("Updated token map with host %s (%u tokens). Rebuilt token map with %u hosts and %u "
688 "tokens in %f ms",
689 host->address_string().c_str(), (unsigned int)new_tokens.size(),
690 (unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
691 (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
692 }
693
694 template <class Partitioner>
remove_host_and_build(const Host::Ptr & host)695 void TokenMapImpl<Partitioner>::remove_host_and_build(const Host::Ptr& host) {
696 if (hosts_.find(host) == hosts_.end()) return;
697 uint64_t start = uv_hrtime();
698 remove_host_tokens(host);
699 hosts_.erase(host);
700 build_replicas();
701 LOG_DEBUG(
702 "Removed host %s from token map. Rebuilt token map with %u hosts and %u tokens in %f ms",
703 host->address_string().c_str(), (unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
704 (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
705 }
706
707 template <class Partitioner>
add_keyspaces(const VersionNumber & cassandra_version,const ResultResponse * result)708 void TokenMapImpl<Partitioner>::add_keyspaces(const VersionNumber& cassandra_version,
709 const ResultResponse* result) {
710 update_keyspace(cassandra_version, result, false);
711 }
712
713 template <class Partitioner>
update_keyspaces_and_build(const VersionNumber & cassandra_version,const ResultResponse * result)714 void TokenMapImpl<Partitioner>::update_keyspaces_and_build(const VersionNumber& cassandra_version,
715 const ResultResponse* result) {
716 update_keyspace(cassandra_version, result, true);
717 }
718
719 template <class Partitioner>
drop_keyspace(const String & keyspace_name)720 void TokenMapImpl<Partitioner>::drop_keyspace(const String& keyspace_name) {
721 replicas_.erase(keyspace_name);
722 strategies_.erase(keyspace_name);
723 }
724
725 template <class Partitioner>
build()726 void TokenMapImpl<Partitioner>::build() {
727 uint64_t start = uv_hrtime();
728 std::sort(tokens_.begin(), tokens_.end());
729 build_replicas();
730 LOG_DEBUG("Built token map with %u hosts and %u tokens in %f ms", (unsigned int)hosts_.size(),
731 (unsigned int)tokens_.size(), (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
732 }
733
734 template <class Partitioner>
copy() const735 TokenMap::Ptr TokenMapImpl<Partitioner>::copy() const {
736 return Ptr(new TokenMapImpl<Partitioner>(*this));
737 }
738
739 template <class Partitioner>
get_replicas(const String & keyspace_name,const String & routing_key) const740 const CopyOnWriteHostVec& TokenMapImpl<Partitioner>::get_replicas(const String& keyspace_name,
741 const String& routing_key) const {
742 typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
743
744 if (ks_it != replicas_.end()) {
745 Token token = Partitioner::hash(routing_key);
746 const TokenReplicasVec& replicas = ks_it->second;
747 typename TokenReplicasVec::const_iterator replicas_it =
748 std::upper_bound(replicas.begin(), replicas.end(), TokenReplicas(token, no_replicas_dummy_),
749 TokenReplicasCompare());
750 if (replicas_it != replicas.end()) {
751 return replicas_it->second;
752 } else if (!replicas.empty()) {
753 return replicas.front().second;
754 }
755 }
756
757 return no_replicas_dummy_;
758 }
759
760 template <class Partitioner>
dump(const String & keyspace_name) const761 String TokenMapImpl<Partitioner>::dump(const String& keyspace_name) const {
762 String result;
763 typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
764 const TokenReplicasVec& replicas = ks_it->second;
765
766 for (typename TokenReplicasVec::const_iterator it = replicas.begin(), end = replicas.end();
767 it != end; ++it) {
768 OStringStream ss;
769 ss << std::setw(20) << it->first << " [ ";
770 const CopyOnWriteHostVec& hosts = it->second;
771 for (HostVec::const_iterator host_it = hosts->begin(), end = hosts->end(); host_it != end;
772 ++host_it) {
773 ss << (*host_it)->address_string() << " ";
774 }
775 ss << "]\n";
776 result.append(ss.str());
777 }
778 return result;
779 }
780
781 template <class Partitioner>
782 const typename TokenMapImpl<Partitioner>::TokenReplicasVec&
token_replicas(const String & keyspace_name) const783 TokenMapImpl<Partitioner>::token_replicas(const String& keyspace_name) const {
784 typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
785 static TokenReplicasVec not_found;
786 return ks_it != replicas_.end() ? ks_it->second : not_found;
787 }
788
789 template <class Partitioner>
update_keyspace(const VersionNumber & cassandra_version,const ResultResponse * result,bool should_build_replicas)790 void TokenMapImpl<Partitioner>::update_keyspace(const VersionNumber& cassandra_version,
791 const ResultResponse* result,
792 bool should_build_replicas) {
793 ResultIterator rows(result);
794
795 while (rows.next()) {
796 String keyspace_name;
797 const Row* row = rows.row();
798
799 if (!row->get_string_by_name("keyspace_name", &keyspace_name)) {
800 LOG_ERROR("Unable to get column value for 'keyspace_name'");
801 continue;
802 }
803
804 ReplicationStrategy<Partitioner> strategy;
805
806 strategy.init(dc_ids_, cassandra_version, row);
807
808 typename KeyspaceStrategyMap::iterator i = strategies_.find(keyspace_name);
809 if (i == strategies_.end() || i->second != strategy) {
810 if (i == strategies_.end()) {
811 strategies_[keyspace_name] = strategy;
812 } else {
813 i->second = strategy;
814 }
815 if (should_build_replicas) {
816 uint64_t start = uv_hrtime();
817 build_datacenters(hosts_, datacenters_);
818 strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
819 LOG_DEBUG("Updated token map with keyspace '%s'. Rebuilt token map with %u hosts and %u "
820 "tokens in %f ms",
821 keyspace_name.c_str(), (unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
822 (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
823 }
824 }
825 }
826 }
827
828 template <class Partitioner>
remove_host_tokens(const Host::Ptr & host)829 void TokenMapImpl<Partitioner>::remove_host_tokens(const Host::Ptr& host) {
830 typename TokenHostVec::iterator last =
831 std::remove_copy_if(tokens_.begin(), tokens_.end(), tokens_.begin(), RemoveTokenHostIf(host));
832 tokens_.resize(last - tokens_.begin());
833 }
834
835 template <class Partitioner>
update_host_ids(const Host::Ptr & host)836 void TokenMapImpl<Partitioner>::update_host_ids(const Host::Ptr& host) {
837 host->set_rack_and_dc_ids(rack_ids_.get(host->rack()), dc_ids_.get(host->dc()));
838 }
839
840 template <class Partitioner>
build_replicas()841 void TokenMapImpl<Partitioner>::build_replicas() {
842 build_datacenters(hosts_, datacenters_);
843 for (typename KeyspaceStrategyMap::const_iterator i = strategies_.begin(),
844 end = strategies_.end();
845 i != end; ++i) {
846 const String& keyspace_name = i->first;
847 const ReplicationStrategy<Partitioner>& strategy = i->second;
848 strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
849 LOG_TRACE("Replicas for keyspace '%s':\n%s", keyspace_name.c_str(),
850 dump(keyspace_name).c_str());
851 }
852 }
853
854 }}} // namespace datastax::internal::core
855
856 #endif
857