1package eureka 2 3import ( 4 "fmt" 5 "net/http" 6 "sync" 7 "time" 8 9 "github.com/hudl/fargo" 10 11 "github.com/go-kit/kit/log" 12 "github.com/go-kit/kit/sd" 13) 14 15// Matches official Netflix Java client default. 16const defaultRenewalInterval = 30 * time.Second 17 18// The methods of fargo.Connection used in this package. 19type fargoConnection interface { 20 RegisterInstance(instance *fargo.Instance) error 21 DeregisterInstance(instance *fargo.Instance) error 22 ReregisterInstance(instance *fargo.Instance) error 23 HeartBeatInstance(instance *fargo.Instance) error 24 ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate 25 GetApp(name string) (*fargo.Application, error) 26} 27 28type fargoUnsuccessfulHTTPResponse struct { 29 statusCode int 30 messagePrefix string 31} 32 33func (u *fargoUnsuccessfulHTTPResponse) Error() string { 34 return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) 35} 36 37// Registrar maintains service instance liveness information in Eureka. 38type Registrar struct { 39 conn fargoConnection 40 instance *fargo.Instance 41 logger log.Logger 42 quitc chan chan struct{} 43 sync.Mutex 44} 45 46var _ sd.Registrar = (*Registrar)(nil) 47 48// NewRegistrar returns an Eureka Registrar acting on behalf of the provided 49// Fargo connection and instance. See the integration test for usage examples. 50func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar { 51 return &Registrar{ 52 conn: conn, 53 instance: instance, 54 logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)), 55 } 56} 57 58// Register implements sd.Registrar. 59func (r *Registrar) Register() { 60 r.Lock() 61 defer r.Unlock() 62 63 if r.quitc != nil { 64 return // Already in the registration loop. 65 } 66 67 if err := r.conn.RegisterInstance(r.instance); err != nil { 68 r.logger.Log("during", "Register", "err", err) 69 } 70 71 r.quitc = make(chan chan struct{}) 72 go r.loop() 73} 74 75// Deregister implements sd.Registrar. 76func (r *Registrar) Deregister() { 77 r.Lock() 78 defer r.Unlock() 79 80 if r.quitc == nil { 81 return // Already deregistered. 82 } 83 84 q := make(chan struct{}) 85 r.quitc <- q 86 <-q 87 r.quitc = nil 88} 89 90func (r *Registrar) loop() { 91 var renewalInterval time.Duration 92 if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { 93 renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second 94 } else { 95 renewalInterval = defaultRenewalInterval 96 } 97 ticker := time.NewTicker(renewalInterval) 98 defer ticker.Stop() 99 100 for { 101 select { 102 case <-ticker.C: 103 if err := r.heartbeat(); err != nil { 104 r.logger.Log("during", "heartbeat", "err", err) 105 } 106 107 case q := <-r.quitc: 108 if err := r.conn.DeregisterInstance(r.instance); err != nil { 109 r.logger.Log("during", "Deregister", "err", err) 110 } 111 close(q) 112 return 113 } 114 } 115} 116 117func httpResponseStatusCode(err error) (code int, present bool) { 118 if code, ok := fargo.HTTPResponseStatusCode(err); ok { 119 return code, true 120 } 121 // Allow injection of errors for testing. 122 if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok { 123 return u.statusCode, true 124 } 125 return 0, false 126} 127 128func isNotFound(err error) bool { 129 code, ok := httpResponseStatusCode(err) 130 return ok && code == http.StatusNotFound 131} 132 133func (r *Registrar) heartbeat() error { 134 err := r.conn.HeartBeatInstance(r.instance) 135 if err == nil { 136 return nil 137 } 138 if isNotFound(err) { 139 // Instance expired (e.g. network partition). Re-register. 140 return r.conn.ReregisterInstance(r.instance) 141 } 142 return err 143} 144