1// Copyright (c) 2012 The gocql Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package gocql
6
7import (
8	"errors"
9	"net"
10	"time"
11)
12
13// PoolConfig configures the connection pool used by the driver, it defaults to
14// using a round-robin host selection policy and a round-robin connection selection
15// policy for each host.
16type PoolConfig struct {
17	// HostSelectionPolicy sets the policy for selecting which host to use for a
18	// given query (default: RoundRobinHostPolicy())
19	HostSelectionPolicy HostSelectionPolicy
20}
21
22func (p PoolConfig) buildPool(session *Session) *policyConnPool {
23	return newPolicyConnPool(session)
24}
25
26// ClusterConfig is a struct to configure the default cluster implementation
27// of gocql. It has a variety of attributes that can be used to modify the
28// behavior to fit the most common use cases. Applications that require a
29// different setup must implement their own cluster.
30type ClusterConfig struct {
31	// addresses for the initial connections. It is recommended to use the value set in
32	// the Cassandra config for broadcast_address or listen_address, an IP address not
33	// a domain name. This is because events from Cassandra will use the configured IP
34	// address, which is used to index connected hosts. If the domain name specified
35	// resolves to more than 1 IP address then the driver may connect multiple times to
36	// the same host, and will not mark the node being down or up from events.
37	Hosts      []string
38	CQLVersion string // CQL version (default: 3.0.0)
39
40	// ProtoVersion sets the version of the native protocol to use, this will
41	// enable features in the driver for specific protocol versions, generally this
42	// should be set to a known version (2,3,4) for the cluster being connected to.
43	//
44	// If it is 0 or unset (the default) then the driver will attempt to discover the
45	// highest supported protocol for the cluster. In clusters with nodes of different
46	// versions the protocol selected is not defined (ie, it can be any of the supported in the cluster)
47	ProtoVersion       int
48	Timeout            time.Duration                            // connection timeout (default: 600ms)
49	ConnectTimeout     time.Duration                            // initial connection timeout, used during initial dial to server (default: 600ms)
50	Port               int                                      // port (default: 9042)
51	Keyspace           string                                   // initial keyspace (optional)
52	NumConns           int                                      // number of connections per host (default: 2)
53	Consistency        Consistency                              // default consistency level (default: Quorum)
54	Compressor         Compressor                               // compression algorithm (default: nil)
55	Authenticator      Authenticator                            // authenticator (default: nil)
56	AuthProvider       func(h *HostInfo) (Authenticator, error) // an authenticator factory. Can be used to create alternative authenticators (default: nil)
57	RetryPolicy        RetryPolicy                              // Default retry policy to use for queries (default: 0)
58	ConvictionPolicy   ConvictionPolicy                         // Decide whether to mark host as down based on the error and host info (default: SimpleConvictionPolicy)
59	ReconnectionPolicy ReconnectionPolicy                       // Default reconnection policy to use for reconnecting before trying to mark host as down (default: see below)
60	SocketKeepalive    time.Duration                            // The keepalive period to use, enabled if > 0 (default: 0)
61	MaxPreparedStmts   int                                      // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
62	MaxRoutingKeyInfo  int                                      // Sets the maximum cache size for query info about statements for each session (default: 1000)
63	PageSize           int                                      // Default page size to use for created sessions (default: 5000)
64	SerialConsistency  SerialConsistency                        // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
65	SslOpts            *SslOptions
66	DefaultTimestamp   bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above)
67	// PoolConfig configures the underlying connection pool, allowing the
68	// configuration of host selection and connection selection policies.
69	PoolConfig PoolConfig
70
71	// If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectInterval.
72	ReconnectInterval time.Duration
73
74	// The maximum amount of time to wait for schema agreement in a cluster after
75	// receiving a schema change frame. (default: 60s)
76	MaxWaitSchemaAgreement time.Duration
77
78	// HostFilter will filter all incoming events for host, any which don't pass
79	// the filter will be ignored. If set will take precedence over any options set
80	// via Discovery
81	HostFilter HostFilter
82
83	// AddressTranslator will translate addresses found on peer discovery and/or
84	// node change events.
85	AddressTranslator AddressTranslator
86
87	// If IgnorePeerAddr is true and the address in system.peers does not match
88	// the supplied host by either initial hosts or discovered via events then the
89	// host will be replaced with the supplied address.
90	//
91	// For example if an event comes in with host=10.0.0.1 but when looking up that
92	// address in system.local or system.peers returns 127.0.0.1, the peer will be
93	// set to 10.0.0.1 which is what will be used to connect to.
94	IgnorePeerAddr bool
95
96	// If DisableInitialHostLookup then the driver will not attempt to get host info
97	// from the system.peers table, this will mean that the driver will connect to
98	// hosts supplied and will not attempt to lookup the hosts information, this will
99	// mean that data_centre, rack and token information will not be available and as
100	// such host filtering and token aware query routing will not be available.
101	DisableInitialHostLookup bool
102
103	// Configure events the driver will register for
104	Events struct {
105		// disable registering for status events (node up/down)
106		DisableNodeStatusEvents bool
107		// disable registering for topology events (node added/removed/moved)
108		DisableTopologyEvents bool
109		// disable registering for schema events (keyspace/table/function removed/created/updated)
110		DisableSchemaEvents bool
111	}
112
113	// DisableSkipMetadata will override the internal result metadata cache so that the driver does not
114	// send skip_metadata for queries, this means that the result will always contain
115	// the metadata to parse the rows and will not reuse the metadata from the prepared
116	// statement.
117	//
118	// See https://issues.apache.org/jira/browse/CASSANDRA-10786
119	DisableSkipMetadata bool
120
121	// QueryObserver will set the provided query observer on all queries created from this session.
122	// Use it to collect metrics / stats from queries by providing an implementation of QueryObserver.
123	QueryObserver QueryObserver
124
125	// BatchObserver will set the provided batch observer on all queries created from this session.
126	// Use it to collect metrics / stats from batch queries by providing an implementation of BatchObserver.
127	BatchObserver BatchObserver
128
129	// ConnectObserver will set the provided connect observer on all queries
130	// created from this session.
131	ConnectObserver ConnectObserver
132
133	// FrameHeaderObserver will set the provided frame header observer on all frames' headers created from this session.
134	// Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver.
135	FrameHeaderObserver FrameHeaderObserver
136
137	// Default idempotence for queries
138	DefaultIdempotence bool
139
140	// The time to wait for frames before flushing the frames connection to Cassandra.
141	// Can help reduce syscall overhead by making less calls to write. Set to 0 to
142	// disable.
143	//
144	// (default: 200 microseconds)
145	WriteCoalesceWaitTime time.Duration
146
147	// internal config for testing
148	disableControlConn bool
149}
150
151// NewCluster generates a new config for the default cluster implementation.
152//
153// The supplied hosts are used to initially connect to the cluster then the rest of
154// the ring will be automatically discovered. It is recommended to use the value set in
155// the Cassandra config for broadcast_address or listen_address, an IP address not
156// a domain name. This is because events from Cassandra will use the configured IP
157// address, which is used to index connected hosts. If the domain name specified
158// resolves to more than 1 IP address then the driver may connect multiple times to
159// the same host, and will not mark the node being down or up from events.
160func NewCluster(hosts ...string) *ClusterConfig {
161	cfg := &ClusterConfig{
162		Hosts:                  hosts,
163		CQLVersion:             "3.0.0",
164		Timeout:                600 * time.Millisecond,
165		ConnectTimeout:         600 * time.Millisecond,
166		Port:                   9042,
167		NumConns:               2,
168		Consistency:            Quorum,
169		MaxPreparedStmts:       defaultMaxPreparedStmts,
170		MaxRoutingKeyInfo:      1000,
171		PageSize:               5000,
172		DefaultTimestamp:       true,
173		MaxWaitSchemaAgreement: 60 * time.Second,
174		ReconnectInterval:      60 * time.Second,
175		ConvictionPolicy:       &SimpleConvictionPolicy{},
176		ReconnectionPolicy:     &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
177		WriteCoalesceWaitTime:  200 * time.Microsecond,
178	}
179	return cfg
180}
181
182// CreateSession initializes the cluster based on this config and returns a
183// session object that can be used to interact with the database.
184func (cfg *ClusterConfig) CreateSession() (*Session, error) {
185	return NewSession(*cfg)
186}
187
188// translateAddressPort is a helper method that will use the given AddressTranslator
189// if defined, to translate the given address and port into a possibly new address
190// and port, If no AddressTranslator or if an error occurs, the given address and
191// port will be returned.
192func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, int) {
193	if cfg.AddressTranslator == nil || len(addr) == 0 {
194		return addr, port
195	}
196	newAddr, newPort := cfg.AddressTranslator.Translate(addr, port)
197	if gocqlDebug {
198		Logger.Printf("gocql: translating address '%v:%d' to '%v:%d'", addr, port, newAddr, newPort)
199	}
200	return newAddr, newPort
201}
202
203func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
204	return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
205}
206
207var (
208	ErrNoHosts              = errors.New("no hosts provided")
209	ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")
210	ErrHostQueryFailed      = errors.New("unable to populate Hosts")
211)
212