1package sd
2
3import (
4	"time"
5
6	"github.com/go-kit/kit/endpoint"
7	"github.com/go-kit/kit/log"
8)
9
10// Endpointer listens to a service discovery system and yields a set of
11// identical endpoints on demand. An error indicates a problem with connectivity
12// to the service discovery system, or within the system itself; an Endpointer
13// may yield no endpoints without error.
14type Endpointer interface {
15	Endpoints() ([]endpoint.Endpoint, error)
16}
17
18// FixedEndpointer yields a fixed set of endpoints.
19type FixedEndpointer []endpoint.Endpoint
20
21// Endpoints implements Endpointer.
22func (s FixedEndpointer) Endpoints() ([]endpoint.Endpoint, error) { return s, nil }
23
24// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
25// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
26// keeps returning previously created Endpoints assuming they are still good, unless
27// this behavior is disabled via InvalidateOnError option.
28func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer {
29	opts := endpointerOptions{}
30	for _, opt := range options {
31		opt(&opts)
32	}
33	se := &DefaultEndpointer{
34		cache:     newEndpointCache(f, logger, opts),
35		instancer: src,
36		ch:        make(chan Event),
37	}
38	go se.receive()
39	src.Register(se.ch)
40	return se
41}
42
43// EndpointerOption allows control of endpointCache behavior.
44type EndpointerOption func(*endpointerOptions)
45
46// InvalidateOnError returns EndpointerOption that controls how the Endpointer
47// behaves when then Instancer publishes an Event containing an error.
48// Without this option the Endpointer continues returning the last known
49// endpoints. With this option, the Endpointer continues returning the last
50// known endpoints until the timeout elapses, then closes all active endpoints
51// and starts returning an error. Once the Instancer sends a new update with
52// valid resource instances, the normal operation is resumed.
53func InvalidateOnError(timeout time.Duration) EndpointerOption {
54	return func(opts *endpointerOptions) {
55		opts.invalidateOnError = true
56		opts.invalidateTimeout = timeout
57	}
58}
59
60type endpointerOptions struct {
61	invalidateOnError bool
62	invalidateTimeout time.Duration
63}
64
65// DefaultEndpointer implements an Endpointer interface.
66// When created with NewEndpointer function, it automatically registers
67// as a subscriber to events from the Instances and maintains a list
68// of active Endpoints.
69type DefaultEndpointer struct {
70	cache     *endpointCache
71	instancer Instancer
72	ch        chan Event
73}
74
75func (de *DefaultEndpointer) receive() {
76	for event := range de.ch {
77		de.cache.Update(event)
78	}
79}
80
81// Close deregisters DefaultEndpointer from the Instancer and stops the internal go-routine.
82func (de *DefaultEndpointer) Close() {
83	de.instancer.Deregister(de.ch)
84	close(de.ch)
85}
86
87// Endpoints implements Endpointer.
88func (de *DefaultEndpointer) Endpoints() ([]endpoint.Endpoint, error) {
89	return de.cache.Endpoints()
90}
91