1// Package metadata provides metadata information between bosun and OpenTSDB.
2package metadata // import "bosun.org/metadata"
3
4import (
5	"bytes"
6	"encoding/json"
7	"io"
8	"io/ioutil"
9	"net/http"
10	"net/url"
11	"reflect"
12	"strings"
13	"sync"
14	"time"
15
16	"bosun.org/opentsdb"
17	"bosun.org/slog"
18	"bosun.org/util"
19)
20
21var (
22	// AuthToken is an optional string that sets the X-Access-Token HTTP header
23	// which is used to authenticate against Bosun
24	AuthToken string
25)
26
27// RateType is the type of rate for a metric: gauge, counter, or rate.
28type RateType string
29
30const (
31	// Unknown is a not-yet documented rate type.
32	Unknown RateType = ""
33	// Gauge rate type.
34	Gauge = "gauge"
35	// Counter rate type.
36	Counter = "counter"
37	// Rate rate type.
38	Rate = "rate"
39)
40
41// Unit is the unit for a metric.
42type Unit string
43
44const (
45	// None is a not-yet documented unit.
46	None            Unit = ""
47	A                    = "A"            // Amps
48	ActiveUsers          = "active users" // Google Analytics
49	Alert                = "alerts"
50	Abort                = "aborts"
51	Bool                 = "bool"
52	BitsPerSecond        = "bits per second"
53	Bytes                = "bytes"
54	BytesPerSecond       = "bytes per second"
55	C                    = "C" // Celsius
56	CacheHit             = "cache hits"
57	CacheMiss            = "cache misses"
58	Change               = "changes"
59	Channel              = "channels"
60	Check                = "checks"
61	CHz                  = "CentiHertz"
62	Client               = "clients"
63	Command              = "commands"
64	Connection           = "connections"
65	Consumer             = "consumers"
66	Context              = "contexts"
67	ContextSwitch        = "context switches"
68	Count                = ""
69	Document             = "documents"
70	Enabled              = "enabled"
71	Entropy              = "entropy"
72	Error                = "errors"
73	Event                = ""
74	Eviction             = "evictions"
75	Exchange             = "exchanges"
76	Fault                = "faults"
77	Flush                = "flushes"
78	Files                = "files"
79	Frame                = "frames"
80	Fraction             = "fraction"
81	Get                  = "gets"
82	GetExists            = "get exists"
83	Group                = "groups"
84	Incident             = "incidents"
85	Interupt             = "interupts"
86	InProgress           = "in progress"
87	Item                 = "items"
88	KBytes               = "kbytes"
89	Key                  = "keys"
90	Load                 = "load"
91	EMail                = "emails"
92	MHz                  = "MHz" // MegaHertz
93	Megabit              = "Mbit"
94	Merge                = "merges"
95	Message              = "messages"
96	MilliSecond          = "milliseconds"
97	Nanosecond           = "nanoseconds"
98	Node                 = "nodes"
99	Ok                   = "ok" // "OK" or not status, 0 = ok, 1 = not ok
100	Operation            = "Operations"
101	Packet               = "packets"
102	Page                 = "pages"
103	Pct                  = "percent" // Range of 0-100.
104	PerSecond            = "per second"
105	Pool                 = "pools"
106	Process              = "processes"
107	Priority             = "priority"
108	Query                = "queries"
109	Queue                = "queues"
110	Ratio                = "ratio"
111	Redispatch           = "redispatches"
112	Refresh              = "refreshes"
113	Replica              = "replicas"
114	Retry                = "retries"
115	Response             = "responses"
116	Request              = "requests"
117	RPM                  = "RPM" // Rotations per minute.
118	Scheduled            = "scheduled"
119	Score                = "score"
120	Second               = "seconds"
121	Sector               = "sectors"
122	Segment              = "segments"
123	Server               = "servers"
124	Session              = "sessions"
125	Shard                = "shards"
126	Slave                = "slaves"
127	Socket               = "sockets"
128	Suggest              = "suggests"
129	StatusCode           = "status code"
130	Resync               = "resynchronizations"
131	Syscall              = "system calls"
132	Thread               = "threads"
133	Timestamp            = "timestamp"
134	Transition           = "transitions"
135	USD                  = "US dollars"
136	V                    = "V" // Volts
137	V10                  = "tenth-Volts"
138	Vulnerabilities      = "vulnerabilities"
139	Watt                 = "Watts"
140	Weight               = "weight"
141	Yield                = "yields"
142)
143
144// Metakey uniquely identifies a metadata entry.
145type Metakey struct {
146	Metric string
147	Tags   string
148	Name   string
149}
150
151// TagSet returns m's tags.
152func (m Metakey) TagSet() opentsdb.TagSet {
153	tags, err := opentsdb.ParseTags(m.Tags)
154	if err != nil {
155		return nil
156	}
157	return tags
158}
159
160var (
161	metadata  = make(map[Metakey]interface{})
162	metalock  sync.Mutex
163	metahost  string
164	metafuncs []func()
165	metadebug bool
166)
167
168// AddMeta adds a metadata entry to memory, which is queued for later sending.
169func AddMeta(metric string, tags opentsdb.TagSet, name string, value interface{}, setHost bool) {
170	if tags == nil {
171		tags = make(opentsdb.TagSet)
172	}
173	if _, present := tags["host"]; setHost && !present {
174		tags["host"] = util.GetHostManager().GetHostName()
175	}
176	if err := tags.Clean(); err != nil {
177		slog.Error(err)
178		return
179	}
180	ts := tags.Tags()
181	metalock.Lock()
182	defer metalock.Unlock()
183	prev, present := metadata[Metakey{metric, ts, name}]
184	if present && !reflect.DeepEqual(prev, value) {
185		slog.Infof("metadata changed for %s/%s/%s: %v to %v", metric, ts, name, prev, value)
186		go sendMetadata([]Metasend{{
187			Metric: metric,
188			Tags:   tags,
189			Name:   name,
190			Value:  value,
191		}})
192	} else if metadebug {
193		slog.Infof("AddMeta for %s/%s/%s: %v", metric, ts, name, value)
194	}
195	metadata[Metakey{metric, ts, name}] = value
196}
197
198// AddMetricMeta is a convenience function to set the main metadata fields for a
199// metric. Those fields are rate, unit, and description. If you need to document
200// tag keys then use AddMeta.
201func AddMetricMeta(metric string, rate RateType, unit Unit, desc string) {
202	AddMeta(metric, nil, "rate", rate, false)
203	AddMeta(metric, nil, "unit", unit, false)
204	AddMeta(metric, nil, "desc", desc, false)
205}
206
207// Init initializes the metadata send queue.
208func Init(u *url.URL, debug bool) error {
209	mh, err := u.Parse("/api/metadata/put")
210	if err != nil {
211		return err
212	}
213	if strings.HasPrefix(mh.Host, ":") {
214		mh.Host = "localhost" + mh.Host
215	}
216	metahost = mh.String()
217	metadebug = debug
218	go collectMetadata()
219	return nil
220}
221
222var putFunction func(k Metakey, v interface{}) error
223
224func InitF(debug bool, f func(k Metakey, v interface{}) error) error {
225	putFunction = f
226	metadebug = debug
227	go collectMetadata()
228	return nil
229}
230
231func collectMetadata() {
232	// Wait a bit so hopefully our collectors have run once and populated the
233	// metadata.
234	time.Sleep(time.Minute)
235	for {
236		FlushMetadata()
237		time.Sleep(time.Hour)
238	}
239}
240
241func FlushMetadata() {
242	for _, f := range metafuncs {
243		f()
244	}
245	if len(metadata) == 0 {
246		return
247	}
248	metalock.Lock()
249	ms := make([]Metasend, len(metadata))
250	i := 0
251	for k, v := range metadata {
252		ms[i] = Metasend{
253			Metric: k.Metric,
254			Tags:   k.TagSet(),
255			Name:   k.Name,
256			Value:  v,
257		}
258		i++
259	}
260	metalock.Unlock()
261	sendMetadata(ms)
262}
263
264// Metasend is the struct for sending metadata to bosun.
265type Metasend struct {
266	Metric string          `json:",omitempty"`
267	Tags   opentsdb.TagSet `json:",omitempty"`
268	Name   string          `json:",omitempty"`
269	Value  interface{}
270	Time   *time.Time `json:",omitempty"`
271}
272
273func sendMetadata(ms []Metasend) {
274	if putFunction != nil {
275		for _, m := range ms {
276			key := Metakey{
277				Metric: m.Metric,
278				Name:   m.Name,
279				Tags:   m.Tags.Tags(),
280			}
281			err := putFunction(key, m.Value)
282			if err != nil {
283				slog.Error(err)
284				continue
285			}
286		}
287	} else {
288		postMetadata(ms)
289	}
290}
291func postMetadata(ms []Metasend) {
292	b, err := json.Marshal(&ms)
293	if err != nil {
294		slog.Error(err)
295		return
296	}
297	req, err := http.NewRequest(http.MethodPost, metahost, bytes.NewBuffer(b))
298	if err != nil {
299		slog.Error(err)
300		return
301	}
302	req.Header.Set("Content-Type", "application/json")
303	if AuthToken != "" {
304		req.Header.Set("X-Access-Token", AuthToken)
305	}
306	client := http.DefaultClient
307	resp, err := client.Do(req)
308	if err != nil {
309		slog.Error(err)
310		return
311	}
312	defer resp.Body.Close()
313	// Drain up to 512 bytes and close the body to let the Transport reuse the connection
314	io.CopyN(ioutil.Discard, resp.Body, 512)
315	if resp.StatusCode != 204 {
316		slog.Errorln("bad metadata return:", resp.Status)
317		return
318	}
319}
320