1 /*
2   Copyright (c) DataStax, Inc.
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_LOAD_BALANCING_HPP
18 #define DATASTAX_INTERNAL_LOAD_BALANCING_HPP
19 
20 #include "allocated.hpp"
21 #include "cassandra.h"
22 #include "constants.hpp"
23 #include "host.hpp"
24 #include "request.hpp"
25 #include "string.hpp"
26 #include "vector.hpp"
27 
28 #include <uv.h>
29 
30 extern "C" {
31 
32 typedef enum CassBalancingState_ {
33   CASS_BALANCING_INIT,
34   CASS_BALANCING_CLEANUP,
35   CASS_BALANCING_ON_UP,
36   CASS_BALANCING_ON_DOWN,
37   CASS_BALANCING_ON_ADD,
38   CASS_BALANCING_ON_REMOVE,
39   CASS_BALANCING_DISTANCE,
40   CASS_BALANCING_NEW_QUERY_PLAN
41 } CassBalancingState;
42 
43 typedef enum CassHostDistance_ {
44   CASS_HOST_DISTANCE_LOCAL,
45   CASS_HOST_DISTANCE_REMOTE,
46   CASS_HOST_DISTANCE_IGNORE
47 } CassHostDistance;
48 
49 } // extern "C"
50 
51 namespace datastax { namespace internal {
52 
53 class Random;
54 
55 namespace core {
56 
57 class RequestHandler;
58 class TokenMap;
59 
is_dc_local(CassConsistency cl)60 inline bool is_dc_local(CassConsistency cl) {
61   return cl == CASS_CONSISTENCY_LOCAL_ONE || cl == CASS_CONSISTENCY_LOCAL_QUORUM;
62 }
63 
64 class QueryPlan : public Allocated {
65 public:
~QueryPlan()66   virtual ~QueryPlan() {}
67   virtual Host::Ptr compute_next() = 0;
68 
compute_next(Address * address)69   bool compute_next(Address* address) {
70     Host::Ptr host = compute_next();
71     if (host) {
72       *address = host->address();
73       return true;
74     }
75     return false;
76   }
77 };
78 
79 class LoadBalancingPolicy : public RefCounted<LoadBalancingPolicy> {
80 public:
81   typedef SharedRefPtr<LoadBalancingPolicy> Ptr;
82   typedef Vector<Ptr> Vec;
83 
LoadBalancingPolicy()84   LoadBalancingPolicy()
85       : RefCounted<LoadBalancingPolicy>() {}
86 
~LoadBalancingPolicy()87   virtual ~LoadBalancingPolicy() {}
88 
89   virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
90                     const String& local_dc) = 0;
91 
register_handles(uv_loop_t * loop)92   virtual void register_handles(uv_loop_t* loop) {}
close_handles()93   virtual void close_handles() {}
94 
95   virtual CassHostDistance distance(const Host::Ptr& host) const = 0;
96 
97   virtual bool is_host_up(const Address& address) const = 0;
98   virtual void on_host_added(const Host::Ptr& host) = 0;
99   virtual void on_host_removed(const Host::Ptr& host) = 0;
100   virtual void on_host_up(const Host::Ptr& host) = 0;
101   virtual void on_host_down(const Address& address) = 0;
102 
103   virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler,
104                                     const TokenMap* token_map) = 0;
105 
106   virtual LoadBalancingPolicy* new_instance() = 0;
107 };
108 
is_host_ignored(const LoadBalancingPolicy::Vec & policies,const Host::Ptr & host)109 inline bool is_host_ignored(const LoadBalancingPolicy::Vec& policies, const Host::Ptr& host) {
110   for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end();
111        it != end; ++it) {
112     if ((*it)->distance(host) != CASS_HOST_DISTANCE_IGNORE) {
113       return false;
114     }
115   }
116   return true;
117 }
118 
119 class ChainedLoadBalancingPolicy : public LoadBalancingPolicy {
120 public:
ChainedLoadBalancingPolicy(LoadBalancingPolicy * child_policy)121   ChainedLoadBalancingPolicy(LoadBalancingPolicy* child_policy)
122       : child_policy_(child_policy) {}
123 
~ChainedLoadBalancingPolicy()124   virtual ~ChainedLoadBalancingPolicy() {}
125 
init(const Host::Ptr & connected_host,const HostMap & hosts,Random * random,const String & local_dc)126   virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
127                     const String& local_dc) {
128     return child_policy_->init(connected_host, hosts, random, local_dc);
129   }
130 
child_policy() const131   virtual const LoadBalancingPolicy::Ptr& child_policy() const { return child_policy_; }
132 
distance(const Host::Ptr & host) const133   virtual CassHostDistance distance(const Host::Ptr& host) const {
134     return child_policy_->distance(host);
135   }
136 
is_host_up(const Address & address) const137   virtual bool is_host_up(const Address& address) const {
138     return child_policy_->is_host_up(address);
139   }
140 
on_host_added(const Host::Ptr & host)141   virtual void on_host_added(const Host::Ptr& host) { child_policy_->on_host_added(host); }
on_host_removed(const Host::Ptr & host)142   virtual void on_host_removed(const Host::Ptr& host) { child_policy_->on_host_removed(host); }
on_host_up(const Host::Ptr & host)143   virtual void on_host_up(const Host::Ptr& host) { child_policy_->on_host_up(host); }
on_host_down(const Address & address)144   virtual void on_host_down(const Address& address) { child_policy_->on_host_down(address); }
145 
146 protected:
147   LoadBalancingPolicy::Ptr child_policy_;
148 };
149 
150 } // namespace core
151 }} // namespace datastax::internal
152 
153 #endif
154