1package eureka
2
3import (
4	"fmt"
5	"net/http"
6	"sync"
7	"time"
8
9	"github.com/hudl/fargo"
10
11	"github.com/go-kit/kit/log"
12	"github.com/go-kit/kit/sd"
13)
14
15// Matches official Netflix Java client default.
16const defaultRenewalInterval = 30 * time.Second
17
18// The methods of fargo.Connection used in this package.
19type fargoConnection interface {
20	RegisterInstance(instance *fargo.Instance) error
21	DeregisterInstance(instance *fargo.Instance) error
22	ReregisterInstance(instance *fargo.Instance) error
23	HeartBeatInstance(instance *fargo.Instance) error
24	ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
25	GetApp(name string) (*fargo.Application, error)
26}
27
28type fargoUnsuccessfulHTTPResponse struct {
29	statusCode    int
30	messagePrefix string
31}
32
33func (u *fargoUnsuccessfulHTTPResponse) Error() string {
34	return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
35}
36
37// Registrar maintains service instance liveness information in Eureka.
38type Registrar struct {
39	conn     fargoConnection
40	instance *fargo.Instance
41	logger   log.Logger
42	quitc    chan chan struct{}
43	sync.Mutex
44}
45
46var _ sd.Registrar = (*Registrar)(nil)
47
48// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
49// Fargo connection and instance. See the integration test for usage examples.
50func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
51	return &Registrar{
52		conn:     conn,
53		instance: instance,
54		logger:   log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
55	}
56}
57
58// Register implements sd.Registrar.
59func (r *Registrar) Register() {
60	r.Lock()
61	defer r.Unlock()
62
63	if r.quitc != nil {
64		return // Already in the registration loop.
65	}
66
67	if err := r.conn.RegisterInstance(r.instance); err != nil {
68		r.logger.Log("during", "Register", "err", err)
69	}
70
71	r.quitc = make(chan chan struct{})
72	go r.loop()
73}
74
75// Deregister implements sd.Registrar.
76func (r *Registrar) Deregister() {
77	r.Lock()
78	defer r.Unlock()
79
80	if r.quitc == nil {
81		return // Already deregistered.
82	}
83
84	q := make(chan struct{})
85	r.quitc <- q
86	<-q
87	r.quitc = nil
88}
89
90func (r *Registrar) loop() {
91	var renewalInterval time.Duration
92	if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
93		renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
94	} else {
95		renewalInterval = defaultRenewalInterval
96	}
97	ticker := time.NewTicker(renewalInterval)
98	defer ticker.Stop()
99
100	for {
101		select {
102		case <-ticker.C:
103			if err := r.heartbeat(); err != nil {
104				r.logger.Log("during", "heartbeat", "err", err)
105			}
106
107		case q := <-r.quitc:
108			if err := r.conn.DeregisterInstance(r.instance); err != nil {
109				r.logger.Log("during", "Deregister", "err", err)
110			}
111			close(q)
112			return
113		}
114	}
115}
116
117func httpResponseStatusCode(err error) (code int, present bool) {
118	if code, ok := fargo.HTTPResponseStatusCode(err); ok {
119		return code, true
120	}
121	// Allow injection of errors for testing.
122	if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
123		return u.statusCode, true
124	}
125	return 0, false
126}
127
128func isNotFound(err error) bool {
129	code, ok := httpResponseStatusCode(err)
130	return ok && code == http.StatusNotFound
131}
132
133func (r *Registrar) heartbeat() error {
134	err := r.conn.HeartBeatInstance(r.instance)
135	if err == nil {
136		return nil
137	}
138	if isNotFound(err) {
139		// Instance expired (e.g. network partition). Re-register.
140		return r.conn.ReregisterInstance(r.instance)
141	}
142	return err
143}
144