1package couchbase
2
3import (
4	"encoding/json"
5	"fmt"
6	"github.com/couchbase/goutils/logging"
7	"io"
8	"io/ioutil"
9	"net"
10	"net/http"
11	"time"
12	"unsafe"
13)
14
15// Bucket auto-updater gets the latest version of the bucket config from
16// the server. If the configuration has changed then updated the local
17// bucket information. If the bucket has been deleted then notify anyone
18// who is holding a reference to this bucket
19
20const MAX_RETRY_COUNT = 5
21const DISCONNECT_PERIOD = 120 * time.Second
22
23type NotifyFn func(bucket string, err error)
24type StreamingFn func(bucket *Bucket)
25
26// Use TCP keepalive to detect half close sockets
27var updaterTransport http.RoundTripper = &http.Transport{
28	Proxy: http.ProxyFromEnvironment,
29	Dial: (&net.Dialer{
30		Timeout:   30 * time.Second,
31		KeepAlive: 30 * time.Second,
32	}).Dial,
33}
34
35var updaterHTTPClient = &http.Client{Transport: updaterTransport}
36
37func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) {
38
39	var err error
40	var res *http.Response
41
42	for i := 0; i < HTTP_MAX_RETRY; i++ {
43		res, err = updaterHTTPClient.Do(req)
44		if err != nil && isHttpConnError(err) {
45			continue
46		}
47		break
48	}
49
50	if err != nil {
51		return nil, err
52	}
53
54	return res, err
55}
56
57func (b *Bucket) RunBucketUpdater(notify NotifyFn) {
58	b.RunBucketUpdater2(nil, notify)
59}
60
61func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) {
62	go func() {
63		err := b.UpdateBucket2(streamingFn)
64		if err != nil {
65			if notify != nil {
66				notify(b.GetName(), err)
67			}
68			logging.Errorf(" Bucket Updater exited with err %v", err)
69		}
70	}()
71}
72
73func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) {
74	if !bucketLocked {
75		b.Lock()
76		defer b.Unlock()
77	}
78	old := b.connPools
79	b.connPools = unsafe.Pointer(&with)
80	if old != nil {
81		for _, pool := range *(*[]*connectionPool)(old) {
82			if pool != nil && pool.inUse == false {
83				pool.Close()
84			}
85		}
86	}
87	return
88}
89
90func (b *Bucket) UpdateBucket() error {
91	return b.UpdateBucket2(nil)
92}
93
94func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
95	var failures int
96	var returnErr error
97	var poolServices PoolServices
98
99	for {
100
101		if failures == MAX_RETRY_COUNT {
102			logging.Errorf(" Maximum failures reached. Exiting loop...")
103			return fmt.Errorf("Max failures reached. Last Error %v", returnErr)
104		}
105
106		nodes := b.Nodes()
107		if len(nodes) < 1 {
108			return fmt.Errorf("No healthy nodes found")
109		}
110
111		streamUrl := fmt.Sprintf("%s/pools/default/bucketsStreaming/%s", b.pool.client.BaseURL, uriAdj(b.GetName()))
112		logging.Infof(" Trying with %s", streamUrl)
113		req, err := http.NewRequest("GET", streamUrl, nil)
114		if err != nil {
115			return err
116		}
117
118		// Lock here to avoid having pool closed under us.
119		b.RLock()
120		err = maybeAddAuth(req, b.pool.client.ah)
121		b.RUnlock()
122		if err != nil {
123			return err
124		}
125
126		res, err := doHTTPRequestForUpdate(req)
127		if err != nil {
128			return err
129		}
130
131		if res.StatusCode != 200 {
132			bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
133			logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
134			res.Body.Close()
135			returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod)
136			failures++
137			continue
138		}
139
140		dec := json.NewDecoder(res.Body)
141
142		tmpb := &Bucket{}
143		for {
144
145			err := dec.Decode(&tmpb)
146			if err != nil {
147				returnErr = err
148				res.Body.Close()
149				break
150			}
151
152			// if we got here, reset failure count
153			failures = 0
154
155			if b.pool.client.tlsConfig != nil {
156				poolServices, err = b.pool.client.GetPoolServices("default")
157				if err != nil {
158					returnErr = err
159					res.Body.Close()
160					break
161				}
162			}
163
164			b.Lock()
165
166			// mark all the old connection pools for deletion
167			pools := b.getConnPools(true /* already locked */)
168			for _, pool := range pools {
169				if pool != nil {
170					pool.inUse = false
171				}
172			}
173
174			newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
175			for i := range newcps {
176				// get the old connection pool and check if it is still valid
177				pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
178				if pool != nil && pool.inUse == false && pool.tlsConfig == b.pool.client.tlsConfig {
179					// if the hostname and index is unchanged then reuse this pool
180					newcps[i] = pool
181					pool.inUse = true
182					continue
183				}
184				// else create a new pool
185				var encrypted bool
186				hostport := tmpb.VBSMJson.ServerList[i]
187				if b.pool.client.tlsConfig != nil {
188					hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices)
189					if err != nil {
190						b.Unlock()
191						return err
192					}
193				}
194				if b.ah != nil {
195					newcps[i] = newConnectionPool(hostport,
196						b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
197
198				} else {
199					newcps[i] = newConnectionPool(hostport,
200						b.authHandler(true /* bucket already locked */),
201						false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
202				}
203			}
204
205			b.replaceConnPools2(newcps, true /* bucket already locked */)
206
207			tmpb.ah = b.ah
208			b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
209			b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
210			b.Unlock()
211
212			if streamingFn != nil {
213				streamingFn(tmpb)
214			}
215			logging.Debugf("Got new configuration for bucket %s", b.GetName())
216
217		}
218		// we are here because of an error
219		failures++
220		continue
221
222	}
223	return nil
224}
225