1 /*
2 Copyright (c) DataStax, Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef DATASTAX_INTERNAL_LOAD_BALANCING_HPP
18 #define DATASTAX_INTERNAL_LOAD_BALANCING_HPP
19
20 #include "allocated.hpp"
21 #include "cassandra.h"
22 #include "constants.hpp"
23 #include "host.hpp"
24 #include "request.hpp"
25 #include "string.hpp"
26 #include "vector.hpp"
27
28 #include <uv.h>
29
30 extern "C" {
31
32 typedef enum CassBalancingState_ {
33 CASS_BALANCING_INIT,
34 CASS_BALANCING_CLEANUP,
35 CASS_BALANCING_ON_UP,
36 CASS_BALANCING_ON_DOWN,
37 CASS_BALANCING_ON_ADD,
38 CASS_BALANCING_ON_REMOVE,
39 CASS_BALANCING_DISTANCE,
40 CASS_BALANCING_NEW_QUERY_PLAN
41 } CassBalancingState;
42
43 typedef enum CassHostDistance_ {
44 CASS_HOST_DISTANCE_LOCAL,
45 CASS_HOST_DISTANCE_REMOTE,
46 CASS_HOST_DISTANCE_IGNORE
47 } CassHostDistance;
48
49 } // extern "C"
50
51 namespace datastax { namespace internal {
52
53 class Random;
54
55 namespace core {
56
57 class RequestHandler;
58 class TokenMap;
59
is_dc_local(CassConsistency cl)60 inline bool is_dc_local(CassConsistency cl) {
61 return cl == CASS_CONSISTENCY_LOCAL_ONE || cl == CASS_CONSISTENCY_LOCAL_QUORUM;
62 }
63
64 class QueryPlan : public Allocated {
65 public:
~QueryPlan()66 virtual ~QueryPlan() {}
67 virtual Host::Ptr compute_next() = 0;
68
compute_next(Address * address)69 bool compute_next(Address* address) {
70 Host::Ptr host = compute_next();
71 if (host) {
72 *address = host->address();
73 return true;
74 }
75 return false;
76 }
77 };
78
79 class LoadBalancingPolicy : public RefCounted<LoadBalancingPolicy> {
80 public:
81 typedef SharedRefPtr<LoadBalancingPolicy> Ptr;
82 typedef Vector<Ptr> Vec;
83
LoadBalancingPolicy()84 LoadBalancingPolicy()
85 : RefCounted<LoadBalancingPolicy>() {}
86
~LoadBalancingPolicy()87 virtual ~LoadBalancingPolicy() {}
88
89 virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
90 const String& local_dc) = 0;
91
register_handles(uv_loop_t * loop)92 virtual void register_handles(uv_loop_t* loop) {}
close_handles()93 virtual void close_handles() {}
94
95 virtual CassHostDistance distance(const Host::Ptr& host) const = 0;
96
97 virtual bool is_host_up(const Address& address) const = 0;
98 virtual void on_host_added(const Host::Ptr& host) = 0;
99 virtual void on_host_removed(const Host::Ptr& host) = 0;
100 virtual void on_host_up(const Host::Ptr& host) = 0;
101 virtual void on_host_down(const Address& address) = 0;
102
103 virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler,
104 const TokenMap* token_map) = 0;
105
106 virtual LoadBalancingPolicy* new_instance() = 0;
107 };
108
is_host_ignored(const LoadBalancingPolicy::Vec & policies,const Host::Ptr & host)109 inline bool is_host_ignored(const LoadBalancingPolicy::Vec& policies, const Host::Ptr& host) {
110 for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end();
111 it != end; ++it) {
112 if ((*it)->distance(host) != CASS_HOST_DISTANCE_IGNORE) {
113 return false;
114 }
115 }
116 return true;
117 }
118
119 class ChainedLoadBalancingPolicy : public LoadBalancingPolicy {
120 public:
ChainedLoadBalancingPolicy(LoadBalancingPolicy * child_policy)121 ChainedLoadBalancingPolicy(LoadBalancingPolicy* child_policy)
122 : child_policy_(child_policy) {}
123
~ChainedLoadBalancingPolicy()124 virtual ~ChainedLoadBalancingPolicy() {}
125
init(const Host::Ptr & connected_host,const HostMap & hosts,Random * random,const String & local_dc)126 virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
127 const String& local_dc) {
128 return child_policy_->init(connected_host, hosts, random, local_dc);
129 }
130
child_policy() const131 virtual const LoadBalancingPolicy::Ptr& child_policy() const { return child_policy_; }
132
distance(const Host::Ptr & host) const133 virtual CassHostDistance distance(const Host::Ptr& host) const {
134 return child_policy_->distance(host);
135 }
136
is_host_up(const Address & address) const137 virtual bool is_host_up(const Address& address) const {
138 return child_policy_->is_host_up(address);
139 }
140
on_host_added(const Host::Ptr & host)141 virtual void on_host_added(const Host::Ptr& host) { child_policy_->on_host_added(host); }
on_host_removed(const Host::Ptr & host)142 virtual void on_host_removed(const Host::Ptr& host) { child_policy_->on_host_removed(host); }
on_host_up(const Host::Ptr & host)143 virtual void on_host_up(const Host::Ptr& host) { child_policy_->on_host_up(host); }
on_host_down(const Address & address)144 virtual void on_host_down(const Address& address) { child_policy_->on_host_down(address); }
145
146 protected:
147 LoadBalancingPolicy::Ptr child_policy_;
148 };
149
150 } // namespace core
151 }} // namespace datastax::internal
152
153 #endif
154