1package notifications 2 3import ( 4 "net/http" 5 "time" 6) 7 8// EndpointConfig covers the optional configuration parameters for an active 9// endpoint. 10type EndpointConfig struct { 11 Headers http.Header 12 Timeout time.Duration 13 Threshold int 14 Backoff time.Duration 15 IgnoredMediaTypes []string 16 Transport *http.Transport 17} 18 19// defaults set any zero-valued fields to a reasonable default. 20func (ec *EndpointConfig) defaults() { 21 if ec.Timeout <= 0 { 22 ec.Timeout = time.Second 23 } 24 25 if ec.Threshold <= 0 { 26 ec.Threshold = 10 27 } 28 29 if ec.Backoff <= 0 { 30 ec.Backoff = time.Second 31 } 32 33 if ec.Transport == nil { 34 ec.Transport = http.DefaultTransport.(*http.Transport) 35 } 36} 37 38// Endpoint is a reliable, queued, thread-safe sink that notify external http 39// services when events are written. Writes are non-blocking and always 40// succeed for callers but events may be queued internally. 41type Endpoint struct { 42 Sink 43 url string 44 name string 45 46 EndpointConfig 47 48 metrics *safeMetrics 49} 50 51// NewEndpoint returns a running endpoint, ready to receive events. 52func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { 53 var endpoint Endpoint 54 endpoint.name = name 55 endpoint.url = url 56 endpoint.EndpointConfig = config 57 endpoint.defaults() 58 endpoint.metrics = newSafeMetrics() 59 60 // Configures the inmemory queue, retry, http pipeline. 61 endpoint.Sink = newHTTPSink( 62 endpoint.url, endpoint.Timeout, endpoint.Headers, 63 endpoint.Transport, endpoint.metrics.httpStatusListener()) 64 endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) 65 endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) 66 endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes) 67 68 register(&endpoint) 69 return &endpoint 70} 71 72// Name returns the name of the endpoint, generally used for debugging. 73func (e *Endpoint) Name() string { 74 return e.name 75} 76 77// URL returns the url of the endpoint. 78func (e *Endpoint) URL() string { 79 return e.url 80} 81 82// ReadMetrics populates em with metrics from the endpoint. 83func (e *Endpoint) ReadMetrics(em *EndpointMetrics) { 84 e.metrics.Lock() 85 defer e.metrics.Unlock() 86 87 *em = e.metrics.EndpointMetrics 88 // Map still need to copied in a threadsafe manner. 89 em.Statuses = make(map[string]int) 90 for k, v := range e.metrics.Statuses { 91 em.Statuses[k] = v 92 } 93} 94