1package sd 2 3import ( 4 "time" 5 6 "github.com/go-kit/kit/endpoint" 7 "github.com/go-kit/kit/log" 8) 9 10// Endpointer listens to a service discovery system and yields a set of 11// identical endpoints on demand. An error indicates a problem with connectivity 12// to the service discovery system, or within the system itself; an Endpointer 13// may yield no endpoints without error. 14type Endpointer interface { 15 Endpoints() ([]endpoint.Endpoint, error) 16} 17 18// FixedEndpointer yields a fixed set of endpoints. 19type FixedEndpointer []endpoint.Endpoint 20 21// Endpoints implements Endpointer. 22func (s FixedEndpointer) Endpoints() ([]endpoint.Endpoint, error) { return s, nil } 23 24// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src 25// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer 26// keeps returning previously created Endpoints assuming they are still good, unless 27// this behavior is disabled via InvalidateOnError option. 28func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer { 29 opts := endpointerOptions{} 30 for _, opt := range options { 31 opt(&opts) 32 } 33 se := &DefaultEndpointer{ 34 cache: newEndpointCache(f, logger, opts), 35 instancer: src, 36 ch: make(chan Event), 37 } 38 go se.receive() 39 src.Register(se.ch) 40 return se 41} 42 43// EndpointerOption allows control of endpointCache behavior. 44type EndpointerOption func(*endpointerOptions) 45 46// InvalidateOnError returns EndpointerOption that controls how the Endpointer 47// behaves when then Instancer publishes an Event containing an error. 48// Without this option the Endpointer continues returning the last known 49// endpoints. With this option, the Endpointer continues returning the last 50// known endpoints until the timeout elapses, then closes all active endpoints 51// and starts returning an error. Once the Instancer sends a new update with 52// valid resource instances, the normal operation is resumed. 53func InvalidateOnError(timeout time.Duration) EndpointerOption { 54 return func(opts *endpointerOptions) { 55 opts.invalidateOnError = true 56 opts.invalidateTimeout = timeout 57 } 58} 59 60type endpointerOptions struct { 61 invalidateOnError bool 62 invalidateTimeout time.Duration 63} 64 65// DefaultEndpointer implements an Endpointer interface. 66// When created with NewEndpointer function, it automatically registers 67// as a subscriber to events from the Instances and maintains a list 68// of active Endpoints. 69type DefaultEndpointer struct { 70 cache *endpointCache 71 instancer Instancer 72 ch chan Event 73} 74 75func (de *DefaultEndpointer) receive() { 76 for event := range de.ch { 77 de.cache.Update(event) 78 } 79} 80 81// Close deregisters DefaultEndpointer from the Instancer and stops the internal go-routine. 82func (de *DefaultEndpointer) Close() { 83 de.instancer.Deregister(de.ch) 84 close(de.ch) 85} 86 87// Endpoints implements Endpointer. 88func (de *DefaultEndpointer) Endpoints() ([]endpoint.Endpoint, error) { 89 return de.cache.Endpoints() 90} 91