1package couchbase 2 3import ( 4 "encoding/json" 5 "fmt" 6 "github.com/couchbase/goutils/logging" 7 "io" 8 "io/ioutil" 9 "net" 10 "net/http" 11 "time" 12 "unsafe" 13) 14 15// Bucket auto-updater gets the latest version of the bucket config from 16// the server. If the configuration has changed then updated the local 17// bucket information. If the bucket has been deleted then notify anyone 18// who is holding a reference to this bucket 19 20const MAX_RETRY_COUNT = 5 21const DISCONNECT_PERIOD = 120 * time.Second 22 23type NotifyFn func(bucket string, err error) 24type StreamingFn func(bucket *Bucket) 25 26// Use TCP keepalive to detect half close sockets 27var updaterTransport http.RoundTripper = &http.Transport{ 28 Proxy: http.ProxyFromEnvironment, 29 Dial: (&net.Dialer{ 30 Timeout: 30 * time.Second, 31 KeepAlive: 30 * time.Second, 32 }).Dial, 33} 34 35var updaterHTTPClient = &http.Client{Transport: updaterTransport} 36 37func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) { 38 39 var err error 40 var res *http.Response 41 42 for i := 0; i < HTTP_MAX_RETRY; i++ { 43 res, err = updaterHTTPClient.Do(req) 44 if err != nil && isHttpConnError(err) { 45 continue 46 } 47 break 48 } 49 50 if err != nil { 51 return nil, err 52 } 53 54 return res, err 55} 56 57func (b *Bucket) RunBucketUpdater(notify NotifyFn) { 58 b.RunBucketUpdater2(nil, notify) 59} 60 61func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) { 62 go func() { 63 err := b.UpdateBucket2(streamingFn) 64 if err != nil { 65 if notify != nil { 66 notify(b.GetName(), err) 67 } 68 logging.Errorf(" Bucket Updater exited with err %v", err) 69 } 70 }() 71} 72 73func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) { 74 if !bucketLocked { 75 b.Lock() 76 defer b.Unlock() 77 } 78 old := b.connPools 79 b.connPools = unsafe.Pointer(&with) 80 if old != nil { 81 for _, pool := range *(*[]*connectionPool)(old) { 82 if pool != nil && pool.inUse == false { 83 pool.Close() 84 } 85 } 86 } 87 return 88} 89 90func (b *Bucket) UpdateBucket() error { 91 return b.UpdateBucket2(nil) 92} 93 94func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error { 95 var failures int 96 var returnErr error 97 var poolServices PoolServices 98 99 for { 100 101 if failures == MAX_RETRY_COUNT { 102 logging.Errorf(" Maximum failures reached. Exiting loop...") 103 return fmt.Errorf("Max failures reached. Last Error %v", returnErr) 104 } 105 106 nodes := b.Nodes() 107 if len(nodes) < 1 { 108 return fmt.Errorf("No healthy nodes found") 109 } 110 111 streamUrl := fmt.Sprintf("%s/pools/default/bucketsStreaming/%s", b.pool.client.BaseURL, uriAdj(b.GetName())) 112 logging.Infof(" Trying with %s", streamUrl) 113 req, err := http.NewRequest("GET", streamUrl, nil) 114 if err != nil { 115 return err 116 } 117 118 // Lock here to avoid having pool closed under us. 119 b.RLock() 120 err = maybeAddAuth(req, b.pool.client.ah) 121 b.RUnlock() 122 if err != nil { 123 return err 124 } 125 126 res, err := doHTTPRequestForUpdate(req) 127 if err != nil { 128 return err 129 } 130 131 if res.StatusCode != 200 { 132 bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) 133 logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod) 134 res.Body.Close() 135 returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod) 136 failures++ 137 continue 138 } 139 140 dec := json.NewDecoder(res.Body) 141 142 tmpb := &Bucket{} 143 for { 144 145 err := dec.Decode(&tmpb) 146 if err != nil { 147 returnErr = err 148 res.Body.Close() 149 break 150 } 151 152 // if we got here, reset failure count 153 failures = 0 154 155 if b.pool.client.tlsConfig != nil { 156 poolServices, err = b.pool.client.GetPoolServices("default") 157 if err != nil { 158 returnErr = err 159 res.Body.Close() 160 break 161 } 162 } 163 164 b.Lock() 165 166 // mark all the old connection pools for deletion 167 pools := b.getConnPools(true /* already locked */) 168 for _, pool := range pools { 169 if pool != nil { 170 pool.inUse = false 171 } 172 } 173 174 newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList)) 175 for i := range newcps { 176 // get the old connection pool and check if it is still valid 177 pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */) 178 if pool != nil && pool.inUse == false && pool.tlsConfig == b.pool.client.tlsConfig { 179 // if the hostname and index is unchanged then reuse this pool 180 newcps[i] = pool 181 pool.inUse = true 182 continue 183 } 184 // else create a new pool 185 var encrypted bool 186 hostport := tmpb.VBSMJson.ServerList[i] 187 if b.pool.client.tlsConfig != nil { 188 hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices) 189 if err != nil { 190 b.Unlock() 191 return err 192 } 193 } 194 if b.ah != nil { 195 newcps[i] = newConnectionPool(hostport, 196 b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted) 197 198 } else { 199 newcps[i] = newConnectionPool(hostport, 200 b.authHandler(true /* bucket already locked */), 201 false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted) 202 } 203 } 204 205 b.replaceConnPools2(newcps, true /* bucket already locked */) 206 207 tmpb.ah = b.ah 208 b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) 209 b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) 210 b.Unlock() 211 212 if streamingFn != nil { 213 streamingFn(tmpb) 214 } 215 logging.Debugf("Got new configuration for bucket %s", b.GetName()) 216 217 } 218 // we are here because of an error 219 failures++ 220 continue 221 222 } 223 return nil 224} 225