1// Copyright (c) 2012 The gocql Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package gocql 6 7import ( 8 "errors" 9 "net" 10 "time" 11) 12 13// PoolConfig configures the connection pool used by the driver, it defaults to 14// using a round-robin host selection policy and a round-robin connection selection 15// policy for each host. 16type PoolConfig struct { 17 // HostSelectionPolicy sets the policy for selecting which host to use for a 18 // given query (default: RoundRobinHostPolicy()) 19 HostSelectionPolicy HostSelectionPolicy 20} 21 22func (p PoolConfig) buildPool(session *Session) *policyConnPool { 23 return newPolicyConnPool(session) 24} 25 26// ClusterConfig is a struct to configure the default cluster implementation 27// of gocql. It has a variety of attributes that can be used to modify the 28// behavior to fit the most common use cases. Applications that require a 29// different setup must implement their own cluster. 30type ClusterConfig struct { 31 // addresses for the initial connections. It is recommended to use the value set in 32 // the Cassandra config for broadcast_address or listen_address, an IP address not 33 // a domain name. This is because events from Cassandra will use the configured IP 34 // address, which is used to index connected hosts. If the domain name specified 35 // resolves to more than 1 IP address then the driver may connect multiple times to 36 // the same host, and will not mark the node being down or up from events. 37 Hosts []string 38 CQLVersion string // CQL version (default: 3.0.0) 39 40 // ProtoVersion sets the version of the native protocol to use, this will 41 // enable features in the driver for specific protocol versions, generally this 42 // should be set to a known version (2,3,4) for the cluster being connected to. 43 // 44 // If it is 0 or unset (the default) then the driver will attempt to discover the 45 // highest supported protocol for the cluster. In clusters with nodes of different 46 // versions the protocol selected is not defined (ie, it can be any of the supported in the cluster) 47 ProtoVersion int 48 Timeout time.Duration // connection timeout (default: 600ms) 49 ConnectTimeout time.Duration // initial connection timeout, used during initial dial to server (default: 600ms) 50 Port int // port (default: 9042) 51 Keyspace string // initial keyspace (optional) 52 NumConns int // number of connections per host (default: 2) 53 Consistency Consistency // default consistency level (default: Quorum) 54 Compressor Compressor // compression algorithm (default: nil) 55 Authenticator Authenticator // authenticator (default: nil) 56 AuthProvider func(h *HostInfo) (Authenticator, error) // an authenticator factory. Can be used to create alternative authenticators (default: nil) 57 RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0) 58 ConvictionPolicy ConvictionPolicy // Decide whether to mark host as down based on the error and host info (default: SimpleConvictionPolicy) 59 ReconnectionPolicy ReconnectionPolicy // Default reconnection policy to use for reconnecting before trying to mark host as down (default: see below) 60 SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0) 61 MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000) 62 MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000) 63 PageSize int // Default page size to use for created sessions (default: 5000) 64 SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset) 65 SslOpts *SslOptions 66 DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above) 67 // PoolConfig configures the underlying connection pool, allowing the 68 // configuration of host selection and connection selection policies. 69 PoolConfig PoolConfig 70 71 // If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectInterval. 72 ReconnectInterval time.Duration 73 74 // The maximum amount of time to wait for schema agreement in a cluster after 75 // receiving a schema change frame. (default: 60s) 76 MaxWaitSchemaAgreement time.Duration 77 78 // HostFilter will filter all incoming events for host, any which don't pass 79 // the filter will be ignored. If set will take precedence over any options set 80 // via Discovery 81 HostFilter HostFilter 82 83 // AddressTranslator will translate addresses found on peer discovery and/or 84 // node change events. 85 AddressTranslator AddressTranslator 86 87 // If IgnorePeerAddr is true and the address in system.peers does not match 88 // the supplied host by either initial hosts or discovered via events then the 89 // host will be replaced with the supplied address. 90 // 91 // For example if an event comes in with host=10.0.0.1 but when looking up that 92 // address in system.local or system.peers returns 127.0.0.1, the peer will be 93 // set to 10.0.0.1 which is what will be used to connect to. 94 IgnorePeerAddr bool 95 96 // If DisableInitialHostLookup then the driver will not attempt to get host info 97 // from the system.peers table, this will mean that the driver will connect to 98 // hosts supplied and will not attempt to lookup the hosts information, this will 99 // mean that data_centre, rack and token information will not be available and as 100 // such host filtering and token aware query routing will not be available. 101 DisableInitialHostLookup bool 102 103 // Configure events the driver will register for 104 Events struct { 105 // disable registering for status events (node up/down) 106 DisableNodeStatusEvents bool 107 // disable registering for topology events (node added/removed/moved) 108 DisableTopologyEvents bool 109 // disable registering for schema events (keyspace/table/function removed/created/updated) 110 DisableSchemaEvents bool 111 } 112 113 // DisableSkipMetadata will override the internal result metadata cache so that the driver does not 114 // send skip_metadata for queries, this means that the result will always contain 115 // the metadata to parse the rows and will not reuse the metadata from the prepared 116 // statement. 117 // 118 // See https://issues.apache.org/jira/browse/CASSANDRA-10786 119 DisableSkipMetadata bool 120 121 // QueryObserver will set the provided query observer on all queries created from this session. 122 // Use it to collect metrics / stats from queries by providing an implementation of QueryObserver. 123 QueryObserver QueryObserver 124 125 // BatchObserver will set the provided batch observer on all queries created from this session. 126 // Use it to collect metrics / stats from batch queries by providing an implementation of BatchObserver. 127 BatchObserver BatchObserver 128 129 // ConnectObserver will set the provided connect observer on all queries 130 // created from this session. 131 ConnectObserver ConnectObserver 132 133 // FrameHeaderObserver will set the provided frame header observer on all frames' headers created from this session. 134 // Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver. 135 FrameHeaderObserver FrameHeaderObserver 136 137 // Default idempotence for queries 138 DefaultIdempotence bool 139 140 // The time to wait for frames before flushing the frames connection to Cassandra. 141 // Can help reduce syscall overhead by making less calls to write. Set to 0 to 142 // disable. 143 // 144 // (default: 200 microseconds) 145 WriteCoalesceWaitTime time.Duration 146 147 // internal config for testing 148 disableControlConn bool 149} 150 151// NewCluster generates a new config for the default cluster implementation. 152// 153// The supplied hosts are used to initially connect to the cluster then the rest of 154// the ring will be automatically discovered. It is recommended to use the value set in 155// the Cassandra config for broadcast_address or listen_address, an IP address not 156// a domain name. This is because events from Cassandra will use the configured IP 157// address, which is used to index connected hosts. If the domain name specified 158// resolves to more than 1 IP address then the driver may connect multiple times to 159// the same host, and will not mark the node being down or up from events. 160func NewCluster(hosts ...string) *ClusterConfig { 161 cfg := &ClusterConfig{ 162 Hosts: hosts, 163 CQLVersion: "3.0.0", 164 Timeout: 600 * time.Millisecond, 165 ConnectTimeout: 600 * time.Millisecond, 166 Port: 9042, 167 NumConns: 2, 168 Consistency: Quorum, 169 MaxPreparedStmts: defaultMaxPreparedStmts, 170 MaxRoutingKeyInfo: 1000, 171 PageSize: 5000, 172 DefaultTimestamp: true, 173 MaxWaitSchemaAgreement: 60 * time.Second, 174 ReconnectInterval: 60 * time.Second, 175 ConvictionPolicy: &SimpleConvictionPolicy{}, 176 ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, 177 WriteCoalesceWaitTime: 200 * time.Microsecond, 178 } 179 return cfg 180} 181 182// CreateSession initializes the cluster based on this config and returns a 183// session object that can be used to interact with the database. 184func (cfg *ClusterConfig) CreateSession() (*Session, error) { 185 return NewSession(*cfg) 186} 187 188// translateAddressPort is a helper method that will use the given AddressTranslator 189// if defined, to translate the given address and port into a possibly new address 190// and port, If no AddressTranslator or if an error occurs, the given address and 191// port will be returned. 192func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, int) { 193 if cfg.AddressTranslator == nil || len(addr) == 0 { 194 return addr, port 195 } 196 newAddr, newPort := cfg.AddressTranslator.Translate(addr, port) 197 if gocqlDebug { 198 Logger.Printf("gocql: translating address '%v:%d' to '%v:%d'", addr, port, newAddr, newPort) 199 } 200 return newAddr, newPort 201} 202 203func (cfg *ClusterConfig) filterHost(host *HostInfo) bool { 204 return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host)) 205} 206 207var ( 208 ErrNoHosts = errors.New("no hosts provided") 209 ErrNoConnectionsStarted = errors.New("no connections were made when creating the session") 210 ErrHostQueryFailed = errors.New("unable to populate Hosts") 211) 212