1package consul 2 3import ( 4 "errors" 5 "log" 6 7 "github.com/fabiolb/fabio/config" 8 "github.com/fabiolb/fabio/registry" 9 10 "github.com/hashicorp/consul/api" 11) 12 13// be is an implementation of a registry backend for consul. 14type be struct { 15 c *api.Client 16 dc string 17 cfg *config.Consul 18 dereg map[string](chan bool) 19} 20 21func NewBackend(cfg *config.Consul) (registry.Backend, error) { 22 23 consulCfg := &api.Config{Address: cfg.Addr, Scheme: cfg.Scheme, Token: cfg.Token} 24 if cfg.Scheme == "https" { 25 consulCfg.TLSConfig.KeyFile = cfg.TLS.KeyFile 26 consulCfg.TLSConfig.CertFile = cfg.TLS.CertFile 27 consulCfg.TLSConfig.CAFile = cfg.TLS.CAFile 28 consulCfg.TLSConfig.CAPath = cfg.TLS.CAPath 29 consulCfg.TLSConfig.InsecureSkipVerify = cfg.TLS.InsecureSkipVerify 30 } 31 32 // create a reusable client 33 c, err := api.NewClient(consulCfg) 34 if err != nil { 35 return nil, err 36 } 37 38 // ping the agent 39 dc, err := datacenter(c) 40 if err != nil { 41 return nil, err 42 } 43 44 // we're good 45 log.Printf("[INFO] consul: Connecting to %q in datacenter %q", cfg.Addr, dc) 46 return &be{c: c, dc: dc, cfg: cfg}, nil 47} 48 49func (b *be) Register(services []string) error { 50 if b.dereg == nil { 51 b.dereg = make(map[string](chan bool)) 52 } 53 54 if b.cfg.Register { 55 services = append(services, b.cfg.ServiceName) 56 } 57 58 // deregister unneeded services 59 for service := range b.dereg { 60 if stringInSlice(service, services) { 61 continue 62 } 63 err := b.Deregister(service) 64 if err != nil { 65 return err 66 } 67 } 68 69 // register new services 70 for _, service := range services { 71 if b.dereg[service] != nil { 72 log.Printf("[DEBUG] %q already registered", service) 73 continue 74 } 75 76 serviceReg, err := serviceRegistration(b.cfg, service) 77 if err != nil { 78 return err 79 } 80 81 b.dereg[service] = register(b.c, serviceReg) 82 } 83 84 return nil 85} 86 87func (b *be) Deregister(service string) error { 88 dereg := b.dereg[service] 89 if dereg == nil { 90 log.Printf("[WARN]: Attempted to deregister unknown service %q", service) 91 return nil 92 } 93 dereg <- true // trigger deregistration 94 <-dereg // wait for completion 95 delete(b.dereg, service) 96 97 return nil 98} 99 100func (b *be) DeregisterAll() error { 101 log.Printf("[DEBUG]: consul: Deregistering all registered aliases.") 102 for _, dereg := range b.dereg { 103 if dereg == nil { 104 continue 105 } 106 dereg <- true // trigger deregistration 107 <-dereg // wait for completion 108 } 109 return nil 110} 111 112func (b *be) ManualPaths() ([]string, error) { 113 keys, _, err := listKeys(b.c, b.cfg.KVPath, 0) 114 return keys, err 115} 116 117func (b *be) ReadManual(path string) (value string, version uint64, err error) { 118 // we cannot rely on the value provided by WatchManual() since 119 // someone has to call that method first to kick off the go routine. 120 return getKV(b.c, b.cfg.KVPath+path, 0) 121} 122 123func (b *be) WriteManual(path string, value string, version uint64) (ok bool, err error) { 124 // try to create the key first by using version 0 125 if ok, err = putKV(b.c, b.cfg.KVPath+path, value, 0); ok { 126 return 127 } 128 129 // then try the CAS update 130 return putKV(b.c, b.cfg.KVPath+path, value, version) 131} 132 133func (b *be) WatchServices() chan string { 134 log.Printf("[INFO] consul: Using dynamic routes") 135 log.Printf("[INFO] consul: Using tag prefix %q", b.cfg.TagPrefix) 136 137 m := NewServiceMonitor(b.c, b.cfg, b.dc) 138 svc := make(chan string) 139 go m.Watch(svc) 140 return svc 141} 142 143func (b *be) WatchManual() chan string { 144 log.Printf("[INFO] consul: Watching KV path %q", b.cfg.KVPath) 145 146 kv := make(chan string) 147 go watchKV(b.c, b.cfg.KVPath, kv, true) 148 return kv 149} 150 151func (b *be) WatchNoRouteHTML() chan string { 152 log.Printf("[INFO] consul: Watching KV path %q", b.cfg.NoRouteHTMLPath) 153 154 html := make(chan string) 155 go watchKV(b.c, b.cfg.NoRouteHTMLPath, html, false) 156 return html 157} 158 159// datacenter returns the datacenter of the local agent 160func datacenter(c *api.Client) (string, error) { 161 self, err := c.Agent().Self() 162 if err != nil { 163 return "", err 164 } 165 166 cfg, ok := self["Config"] 167 if !ok { 168 return "", errors.New("consul: self.Config not found") 169 } 170 dc, ok := cfg["Datacenter"].(string) 171 if !ok { 172 return "", errors.New("consul: self.Datacenter not found") 173 } 174 return dc, nil 175} 176 177func stringInSlice(str string, strSlice []string) bool { 178 for _, s := range strSlice { 179 if s == str { 180 return true 181 } 182 } 183 return false 184} 185