1package notifications
2
3import (
4	"net/http"
5	"time"
6)
7
8// EndpointConfig covers the optional configuration parameters for an active
9// endpoint.
10type EndpointConfig struct {
11	Headers           http.Header
12	Timeout           time.Duration
13	Threshold         int
14	Backoff           time.Duration
15	IgnoredMediaTypes []string
16	Transport         *http.Transport
17}
18
19// defaults set any zero-valued fields to a reasonable default.
20func (ec *EndpointConfig) defaults() {
21	if ec.Timeout <= 0 {
22		ec.Timeout = time.Second
23	}
24
25	if ec.Threshold <= 0 {
26		ec.Threshold = 10
27	}
28
29	if ec.Backoff <= 0 {
30		ec.Backoff = time.Second
31	}
32
33	if ec.Transport == nil {
34		ec.Transport = http.DefaultTransport.(*http.Transport)
35	}
36}
37
38// Endpoint is a reliable, queued, thread-safe sink that notify external http
39// services when events are written. Writes are non-blocking and always
40// succeed for callers but events may be queued internally.
41type Endpoint struct {
42	Sink
43	url  string
44	name string
45
46	EndpointConfig
47
48	metrics *safeMetrics
49}
50
51// NewEndpoint returns a running endpoint, ready to receive events.
52func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
53	var endpoint Endpoint
54	endpoint.name = name
55	endpoint.url = url
56	endpoint.EndpointConfig = config
57	endpoint.defaults()
58	endpoint.metrics = newSafeMetrics()
59
60	// Configures the inmemory queue, retry, http pipeline.
61	endpoint.Sink = newHTTPSink(
62		endpoint.url, endpoint.Timeout, endpoint.Headers,
63		endpoint.Transport, endpoint.metrics.httpStatusListener())
64	endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
65	endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
66	endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes)
67
68	register(&endpoint)
69	return &endpoint
70}
71
72// Name returns the name of the endpoint, generally used for debugging.
73func (e *Endpoint) Name() string {
74	return e.name
75}
76
77// URL returns the url of the endpoint.
78func (e *Endpoint) URL() string {
79	return e.url
80}
81
82// ReadMetrics populates em with metrics from the endpoint.
83func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
84	e.metrics.Lock()
85	defer e.metrics.Unlock()
86
87	*em = e.metrics.EndpointMetrics
88	// Map still need to copied in a threadsafe manner.
89	em.Statuses = make(map[string]int)
90	for k, v := range e.metrics.Statuses {
91		em.Statuses[k] = v
92	}
93}
94