1package main
2
3import (
4	"bytes"
5	"compress/gzip"
6	"encoding/json"
7	_ "expvar"
8	"flag"
9	"fmt"
10	"io"
11	"io/ioutil"
12	"net/http"
13	"net/http/httptest"
14	"net/http/httputil"
15	"net/url"
16	"os"
17	"strings"
18	"time"
19
20	"github.com/facebookgo/httpcontrol"
21
22	version "bosun.org/_version"
23
24	"bosun.org/cmd/tsdbrelay/denormalize"
25	"bosun.org/collect"
26	"bosun.org/metadata"
27	"bosun.org/opentsdb"
28	"bosun.org/slog"
29	"bosun.org/util"
30)
31
32var (
33	listenAddr       = flag.String("l", ":4242", "Listen address.")
34	bosunServer      = flag.String("b", "bosun", "Target Bosun server. Can specify port with host:port.")
35	secondaryRelays  = flag.String("r", "", "Additional relays to send data to. Intended for secondary data center replication. Only response from primary tsdb server wil be relayed to clients.")
36	tsdbServer       = flag.String("t", "", "Target OpenTSDB server. Can specify port with host:port.")
37	logVerbose       = flag.Bool("v", false, "enable verbose logging")
38	hostnameOverride = flag.String("hostname", "", "Override the own hostname. Especially useful when running in a container.")
39	useFullHostname  = flag.Bool("useFullHostname", false, "Whether to use the fully qualified hostname")
40	toDenormalize    = flag.String("denormalize", "", "List of metrics to denormalize. Comma seperated list of `metric__tagname__tagname` rules. Will be translated to `__tagvalue.tagvalue.metric`")
41	flagVersion      = flag.Bool("version", false, "Prints the version and exits.")
42
43	redisHost = flag.String("redis", "", "redis host for aggregating external counters")
44	redisDb   = flag.Int("db", 0, "redis db to use for counters")
45)
46
47var (
48	tsdbPutURL    string
49	bosunIndexURL string
50
51	denormalizationRules map[string]*denormalize.DenormalizationRule
52
53	relayDataUrls     []string
54	relayMetadataUrls []string
55
56	tags = opentsdb.TagSet{}
57)
58
59type tsdbrelayHTTPTransport struct {
60	UserAgent string
61	http.RoundTripper
62}
63
64func (t *tsdbrelayHTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) {
65	if req.Header.Get("User-Agent") == "" {
66		req.Header.Add("User-Agent", t.UserAgent)
67	}
68	return t.RoundTripper.RoundTrip(req)
69}
70
71func init() {
72	client := &http.Client{
73		Transport: &tsdbrelayHTTPTransport{
74			"Tsdbrelay/" + version.ShortVersion(),
75			&httpcontrol.Transport{
76				RequestTimeout: time.Minute,
77			},
78		},
79	}
80	http.DefaultClient = client
81	collect.DefaultClient = client
82}
83
84func main() {
85	var err error
86	myHost, err = os.Hostname()
87	if err != nil || myHost == "" {
88		myHost = "tsdbrelay"
89	}
90
91	flag.Parse()
92	if *flagVersion {
93		fmt.Println(version.GetVersionInfo("tsdbrelay"))
94		os.Exit(0)
95	}
96	if *bosunServer == "" || *tsdbServer == "" {
97		slog.Fatal("must specify both bosun and tsdb server")
98	}
99	slog.Infoln(version.GetVersionInfo("tsdbrelay"))
100	slog.Infoln("listen on", *listenAddr)
101	slog.Infoln("relay to bosun at", *bosunServer)
102	slog.Infoln("relay to tsdb at", *tsdbServer)
103	if *toDenormalize != "" {
104		var err error
105		denormalizationRules, err = denormalize.ParseDenormalizationRules(*toDenormalize)
106		if err != nil {
107			slog.Fatal(err)
108		}
109	}
110
111	util.InitHostManager(*hostnameOverride, *useFullHostname)
112
113	tsdbURL, err := parseHost(*tsdbServer, "", true)
114	if err != nil {
115		slog.Fatalf("Invalid -t value: %s", err)
116	}
117	u := *tsdbURL
118	u.Path = "/api/put"
119	tsdbPutURL = u.String()
120	bosunURL, err := parseHost(*bosunServer, "", true)
121	if err != nil {
122		slog.Fatalf("Invalid -b value: %s", err)
123	}
124	u = *bosunURL
125	u.Path = "/api/index"
126	bosunIndexURL = u.String()
127	if *secondaryRelays != "" {
128		for _, rURL := range strings.Split(*secondaryRelays, ",") {
129			u, err := parseHost(rURL, "/api/put", false)
130			if err != nil {
131				slog.Fatalf("Invalid -r value '%s': %s", rURL, err)
132			}
133			f := u.Fragment
134			u.Fragment = ""
135			if f == "" || strings.ToLower(f) == "data-only" {
136				relayDataUrls = append(relayDataUrls, u.String())
137			}
138			if f == "" || strings.ToLower(f) == "metadata-only" || strings.ToLower(f) == "bosun-index" {
139				u.Path = "/api/metadata/put"
140				relayMetadataUrls = append(relayMetadataUrls, u.String())
141			}
142			if strings.ToLower(f) == "bosun-index" {
143				u.Path = "/api/index"
144				relayDataUrls = append(relayDataUrls, u.String())
145			}
146		}
147	}
148
149	tsdbProxy := util.NewSingleHostProxy(tsdbURL)
150	bosunProxy := util.NewSingleHostProxy(bosunURL)
151	rp := &relayProxy{
152		TSDBProxy:  tsdbProxy,
153		BosunProxy: bosunProxy,
154	}
155	http.HandleFunc("/api/put", func(w http.ResponseWriter, r *http.Request) {
156		rp.relayPut(w, r, true)
157	})
158	if *redisHost != "" {
159		http.HandleFunc("/api/count", collect.HandleCounterPut(*redisHost, *redisDb))
160	}
161	http.HandleFunc("/api/metadata/put", func(w http.ResponseWriter, r *http.Request) {
162		rp.relayMetadata(w, r)
163	})
164	http.Handle("/", tsdbProxy)
165
166	collectUrl := &url.URL{
167		Scheme: "http",
168		Host:   *listenAddr,
169		Path:   "/api/put",
170	}
171	if err = collect.Init(collectUrl, "tsdbrelay"); err != nil {
172		slog.Fatal(err)
173	}
174	if err := metadata.Init(collectUrl, false); err != nil {
175		slog.Fatal(err)
176	}
177	// Make sure these get zeroed out instead of going unknown on restart
178	collect.Add("puts.relayed", tags, 0)
179	collect.Add("puts.error", tags, 0)
180	collect.Add("metadata.relayed", tags, 0)
181	collect.Add("metadata.error", tags, 0)
182	collect.Add("additional.puts.relayed", tags, 0)
183	collect.Add("additional.puts.error", tags, 0)
184	metadata.AddMetricMeta("tsdbrelay.puts.relayed", metadata.Counter, metadata.Count, "Number of successful puts relayed to opentsdb target")
185	metadata.AddMetricMeta("tsdbrelay.puts.error", metadata.Counter, metadata.Count, "Number of puts that could not be relayed to opentsdb target")
186	metadata.AddMetricMeta("tsdbrelay.metadata.relayed", metadata.Counter, metadata.Count, "Number of successful metadata puts relayed to bosun target")
187	metadata.AddMetricMeta("tsdbrelay.metadata.error", metadata.Counter, metadata.Count, "Number of metadata puts that could not be relayed to bosun target")
188	metadata.AddMetricMeta("tsdbrelay.additional.puts.relayed", metadata.Counter, metadata.Count, "Number of successful puts relayed to additional targets")
189	metadata.AddMetricMeta("tsdbrelay.additional.puts.error", metadata.Counter, metadata.Count, "Number of puts that could not be relayed to additional targets")
190	slog.Fatal(http.ListenAndServe(*listenAddr, nil))
191}
192
193func verbose(format string, a ...interface{}) {
194	if *logVerbose {
195		slog.Infof(format, a...)
196	}
197}
198
199type relayProxy struct {
200	TSDBProxy  *httputil.ReverseProxy
201	BosunProxy *httputil.ReverseProxy
202}
203
204type passthru struct {
205	io.ReadCloser
206	buf bytes.Buffer
207}
208
209func (p *passthru) Read(b []byte) (int, error) {
210	n, err := p.ReadCloser.Read(b)
211	p.buf.Write(b[:n])
212	return n, err
213}
214
215type relayWriter struct {
216	http.ResponseWriter
217	code int
218}
219
220func (rw *relayWriter) WriteHeader(code int) {
221	rw.code = code
222	rw.ResponseWriter.WriteHeader(code)
223}
224
225var (
226	relayHeader  = "X-Relayed-From"
227	encHeader    = "Content-Encoding"
228	typeHeader   = "Content-Type"
229	accessHeader = "X-Access-Token"
230	myHost       string
231)
232
233func (rp *relayProxy) relayPut(responseWriter http.ResponseWriter, r *http.Request, parse bool) {
234	isRelayed := r.Header.Get(relayHeader) != ""
235	reader := &passthru{ReadCloser: r.Body}
236	r.Body = reader
237	w := &relayWriter{ResponseWriter: responseWriter}
238	rp.TSDBProxy.ServeHTTP(w, r)
239	if w.code/100 != 2 {
240		verbose("relayPut got status %d", w.code)
241		collect.Add("puts.error", tags, 1)
242		return
243	}
244	verbose("relayed to tsdb")
245	collect.Add("puts.relayed", tags, 1)
246	// Send to bosun in a separate go routine so we can end the source's request.
247	go func() {
248		body := bytes.NewBuffer(reader.buf.Bytes())
249		req, err := http.NewRequest(r.Method, bosunIndexURL, body)
250		if err != nil {
251			verbose("bosun connect error: %v", err)
252			return
253		}
254		if access := r.Header.Get(accessHeader); access != "" {
255			req.Header.Set(accessHeader, access)
256		}
257		resp, err := http.DefaultClient.Do(req)
258		if err != nil {
259			verbose("bosun relay error: %v", err)
260			return
261		}
262		// Drain up to 512 bytes and close the body to let the Transport reuse the connection
263		io.CopyN(ioutil.Discard, resp.Body, 512)
264		resp.Body.Close()
265		verbose("bosun relay success")
266	}()
267	// Parse and denormalize datapoints
268	if !isRelayed && parse && denormalizationRules != nil {
269		go rp.denormalize(bytes.NewReader(reader.buf.Bytes()))
270	}
271
272	if !isRelayed && len(relayDataUrls) > 0 {
273		go func() {
274			for _, relayURL := range relayDataUrls {
275				body := bytes.NewBuffer(reader.buf.Bytes())
276				req, err := http.NewRequest(r.Method, relayURL, body)
277				if err != nil {
278					verbose("%s connect error: %v", relayURL, err)
279					collect.Add("additional.puts.error", tags, 1)
280					continue
281				}
282				if contenttype := r.Header.Get(typeHeader); contenttype != "" {
283					req.Header.Set(typeHeader, contenttype)
284				}
285				if access := r.Header.Get(accessHeader); access != "" {
286					req.Header.Set(accessHeader, access)
287				}
288				if encoding := r.Header.Get(encHeader); encoding != "" {
289					req.Header.Set(encHeader, encoding)
290				}
291				req.Header.Add(relayHeader, myHost)
292				resp, err := http.DefaultClient.Do(req)
293				if err != nil {
294					verbose("secondary relay error: %v", err)
295					collect.Add("additional.puts.error", tags, 1)
296					continue
297				}
298				// Drain up to 512 bytes and close the body to let the Transport reuse the connection
299				io.CopyN(ioutil.Discard, resp.Body, 512)
300				resp.Body.Close()
301				verbose("secondary relay success")
302				collect.Add("additional.puts.relayed", tags, 1)
303			}
304		}()
305	}
306}
307
308func (rp *relayProxy) denormalize(body io.Reader) {
309	gReader, err := gzip.NewReader(body)
310	if err != nil {
311		verbose("error making gzip reader: %v", err)
312		return
313	}
314	decoder := json.NewDecoder(gReader)
315	dps := []*opentsdb.DataPoint{}
316	err = decoder.Decode(&dps)
317	if err != nil {
318		verbose("error decoding data points: %v", err)
319		return
320	}
321	relayDps := []*opentsdb.DataPoint{}
322	for _, dp := range dps {
323		if rule, ok := denormalizationRules[dp.Metric]; ok {
324			if err = rule.Translate(dp); err == nil {
325				relayDps = append(relayDps, dp)
326			} else {
327				verbose("error translating points: %v", err.Error())
328			}
329		}
330	}
331	if len(relayDps) == 0 {
332		return
333	}
334	buf := &bytes.Buffer{}
335	gWriter := gzip.NewWriter(buf)
336	encoder := json.NewEncoder(gWriter)
337	err = encoder.Encode(relayDps)
338	if err != nil {
339		verbose("error encoding denormalized data points: %v", err)
340		return
341	}
342	if err = gWriter.Close(); err != nil {
343		verbose("error zipping denormalized data points: %v", err)
344		return
345	}
346	req, err := http.NewRequest("POST", tsdbPutURL, buf)
347	if err != nil {
348		verbose("error posting denormalized data points: %v", err)
349		return
350	}
351	req.Header.Set(typeHeader, "application/json")
352	req.Header.Set(encHeader, "gzip")
353
354	responseWriter := httptest.NewRecorder()
355	rp.relayPut(responseWriter, req, false)
356
357	verbose("relayed %d denormalized data points. Tsdb response: %d", len(relayDps), responseWriter.Code)
358}
359
360func (rp *relayProxy) relayMetadata(responseWriter http.ResponseWriter, r *http.Request) {
361	reader := &passthru{ReadCloser: r.Body}
362	r.Body = reader
363	w := &relayWriter{ResponseWriter: responseWriter}
364	rp.BosunProxy.ServeHTTP(w, r)
365	if w.code != 204 {
366		verbose("relayMetadata got status %d", w.code)
367		collect.Add("metadata.error", tags, 1)
368		return
369	}
370	verbose("relayed metadata to bosun")
371	collect.Add("metadata.relayed", tags, 1)
372	if r.Header.Get(relayHeader) != "" {
373		return
374	}
375	if len(relayMetadataUrls) != 0 {
376		go func() {
377			for _, relayURL := range relayMetadataUrls {
378				body := bytes.NewBuffer(reader.buf.Bytes())
379				req, err := http.NewRequest(r.Method, relayURL, body)
380				if err != nil {
381					verbose("metadata %s error %v", relayURL, err)
382					continue
383				}
384				if contenttype := r.Header.Get(typeHeader); contenttype != "" {
385					req.Header.Set(typeHeader, contenttype)
386				}
387				if access := r.Header.Get(accessHeader); access != "" {
388					req.Header.Set(accessHeader, access)
389				}
390				if encoding := r.Header.Get(encHeader); encoding != "" {
391					req.Header.Set(encHeader, encoding)
392				}
393				req.Header.Add(relayHeader, myHost)
394				resp, err := http.DefaultClient.Do(req)
395				if err != nil {
396					verbose("secondary relay metadata error: %v", err)
397					continue
398				}
399				// Drain up to 512 bytes and close the body to let the Transport reuse the connection
400				io.CopyN(ioutil.Discard, resp.Body, 512)
401				resp.Body.Close()
402				verbose("secondary relay metadata success")
403			}
404		}()
405	}
406}
407
408// Parses a url of the form proto://host:port/path#fragment with the following rules:
409// proto:// is optional and will default to http:// if omitted
410// :port is optional and will use the default if omitted
411// /path is optional and will be ignored, will always be replaced by newpath
412// #fragment is optional and will be removed if removeFragment is true
413func parseHost(host string, newpath string, removeFragment bool) (*url.URL, error) {
414	if !strings.Contains(host, "//") {
415		host = "http://" + host
416	}
417	u, err := url.Parse(host)
418	if err != nil {
419		return nil, err
420	}
421	if u.Host == "" {
422		return nil, fmt.Errorf("no host specified")
423	}
424	u.Path = newpath
425	if removeFragment {
426		u.Fragment = ""
427	}
428	return u, nil
429}
430