1package main 2 3import ( 4 "bytes" 5 "compress/gzip" 6 "encoding/json" 7 _ "expvar" 8 "flag" 9 "fmt" 10 "io" 11 "io/ioutil" 12 "net/http" 13 "net/http/httptest" 14 "net/http/httputil" 15 "net/url" 16 "os" 17 "strings" 18 "time" 19 20 "github.com/facebookgo/httpcontrol" 21 22 version "bosun.org/_version" 23 24 "bosun.org/cmd/tsdbrelay/denormalize" 25 "bosun.org/collect" 26 "bosun.org/metadata" 27 "bosun.org/opentsdb" 28 "bosun.org/slog" 29 "bosun.org/util" 30) 31 32var ( 33 listenAddr = flag.String("l", ":4242", "Listen address.") 34 bosunServer = flag.String("b", "bosun", "Target Bosun server. Can specify port with host:port.") 35 secondaryRelays = flag.String("r", "", "Additional relays to send data to. Intended for secondary data center replication. Only response from primary tsdb server wil be relayed to clients.") 36 tsdbServer = flag.String("t", "", "Target OpenTSDB server. Can specify port with host:port.") 37 logVerbose = flag.Bool("v", false, "enable verbose logging") 38 hostnameOverride = flag.String("hostname", "", "Override the own hostname. Especially useful when running in a container.") 39 useFullHostname = flag.Bool("useFullHostname", false, "Whether to use the fully qualified hostname") 40 toDenormalize = flag.String("denormalize", "", "List of metrics to denormalize. Comma seperated list of `metric__tagname__tagname` rules. Will be translated to `__tagvalue.tagvalue.metric`") 41 flagVersion = flag.Bool("version", false, "Prints the version and exits.") 42 43 redisHost = flag.String("redis", "", "redis host for aggregating external counters") 44 redisDb = flag.Int("db", 0, "redis db to use for counters") 45) 46 47var ( 48 tsdbPutURL string 49 bosunIndexURL string 50 51 denormalizationRules map[string]*denormalize.DenormalizationRule 52 53 relayDataUrls []string 54 relayMetadataUrls []string 55 56 tags = opentsdb.TagSet{} 57) 58 59type tsdbrelayHTTPTransport struct { 60 UserAgent string 61 http.RoundTripper 62} 63 64func (t *tsdbrelayHTTPTransport) RoundTrip(req *http.Request) (*http.Response, error) { 65 if req.Header.Get("User-Agent") == "" { 66 req.Header.Add("User-Agent", t.UserAgent) 67 } 68 return t.RoundTripper.RoundTrip(req) 69} 70 71func init() { 72 client := &http.Client{ 73 Transport: &tsdbrelayHTTPTransport{ 74 "Tsdbrelay/" + version.ShortVersion(), 75 &httpcontrol.Transport{ 76 RequestTimeout: time.Minute, 77 }, 78 }, 79 } 80 http.DefaultClient = client 81 collect.DefaultClient = client 82} 83 84func main() { 85 var err error 86 myHost, err = os.Hostname() 87 if err != nil || myHost == "" { 88 myHost = "tsdbrelay" 89 } 90 91 flag.Parse() 92 if *flagVersion { 93 fmt.Println(version.GetVersionInfo("tsdbrelay")) 94 os.Exit(0) 95 } 96 if *bosunServer == "" || *tsdbServer == "" { 97 slog.Fatal("must specify both bosun and tsdb server") 98 } 99 slog.Infoln(version.GetVersionInfo("tsdbrelay")) 100 slog.Infoln("listen on", *listenAddr) 101 slog.Infoln("relay to bosun at", *bosunServer) 102 slog.Infoln("relay to tsdb at", *tsdbServer) 103 if *toDenormalize != "" { 104 var err error 105 denormalizationRules, err = denormalize.ParseDenormalizationRules(*toDenormalize) 106 if err != nil { 107 slog.Fatal(err) 108 } 109 } 110 111 util.InitHostManager(*hostnameOverride, *useFullHostname) 112 113 tsdbURL, err := parseHost(*tsdbServer, "", true) 114 if err != nil { 115 slog.Fatalf("Invalid -t value: %s", err) 116 } 117 u := *tsdbURL 118 u.Path = "/api/put" 119 tsdbPutURL = u.String() 120 bosunURL, err := parseHost(*bosunServer, "", true) 121 if err != nil { 122 slog.Fatalf("Invalid -b value: %s", err) 123 } 124 u = *bosunURL 125 u.Path = "/api/index" 126 bosunIndexURL = u.String() 127 if *secondaryRelays != "" { 128 for _, rURL := range strings.Split(*secondaryRelays, ",") { 129 u, err := parseHost(rURL, "/api/put", false) 130 if err != nil { 131 slog.Fatalf("Invalid -r value '%s': %s", rURL, err) 132 } 133 f := u.Fragment 134 u.Fragment = "" 135 if f == "" || strings.ToLower(f) == "data-only" { 136 relayDataUrls = append(relayDataUrls, u.String()) 137 } 138 if f == "" || strings.ToLower(f) == "metadata-only" || strings.ToLower(f) == "bosun-index" { 139 u.Path = "/api/metadata/put" 140 relayMetadataUrls = append(relayMetadataUrls, u.String()) 141 } 142 if strings.ToLower(f) == "bosun-index" { 143 u.Path = "/api/index" 144 relayDataUrls = append(relayDataUrls, u.String()) 145 } 146 } 147 } 148 149 tsdbProxy := util.NewSingleHostProxy(tsdbURL) 150 bosunProxy := util.NewSingleHostProxy(bosunURL) 151 rp := &relayProxy{ 152 TSDBProxy: tsdbProxy, 153 BosunProxy: bosunProxy, 154 } 155 http.HandleFunc("/api/put", func(w http.ResponseWriter, r *http.Request) { 156 rp.relayPut(w, r, true) 157 }) 158 if *redisHost != "" { 159 http.HandleFunc("/api/count", collect.HandleCounterPut(*redisHost, *redisDb)) 160 } 161 http.HandleFunc("/api/metadata/put", func(w http.ResponseWriter, r *http.Request) { 162 rp.relayMetadata(w, r) 163 }) 164 http.Handle("/", tsdbProxy) 165 166 collectUrl := &url.URL{ 167 Scheme: "http", 168 Host: *listenAddr, 169 Path: "/api/put", 170 } 171 if err = collect.Init(collectUrl, "tsdbrelay"); err != nil { 172 slog.Fatal(err) 173 } 174 if err := metadata.Init(collectUrl, false); err != nil { 175 slog.Fatal(err) 176 } 177 // Make sure these get zeroed out instead of going unknown on restart 178 collect.Add("puts.relayed", tags, 0) 179 collect.Add("puts.error", tags, 0) 180 collect.Add("metadata.relayed", tags, 0) 181 collect.Add("metadata.error", tags, 0) 182 collect.Add("additional.puts.relayed", tags, 0) 183 collect.Add("additional.puts.error", tags, 0) 184 metadata.AddMetricMeta("tsdbrelay.puts.relayed", metadata.Counter, metadata.Count, "Number of successful puts relayed to opentsdb target") 185 metadata.AddMetricMeta("tsdbrelay.puts.error", metadata.Counter, metadata.Count, "Number of puts that could not be relayed to opentsdb target") 186 metadata.AddMetricMeta("tsdbrelay.metadata.relayed", metadata.Counter, metadata.Count, "Number of successful metadata puts relayed to bosun target") 187 metadata.AddMetricMeta("tsdbrelay.metadata.error", metadata.Counter, metadata.Count, "Number of metadata puts that could not be relayed to bosun target") 188 metadata.AddMetricMeta("tsdbrelay.additional.puts.relayed", metadata.Counter, metadata.Count, "Number of successful puts relayed to additional targets") 189 metadata.AddMetricMeta("tsdbrelay.additional.puts.error", metadata.Counter, metadata.Count, "Number of puts that could not be relayed to additional targets") 190 slog.Fatal(http.ListenAndServe(*listenAddr, nil)) 191} 192 193func verbose(format string, a ...interface{}) { 194 if *logVerbose { 195 slog.Infof(format, a...) 196 } 197} 198 199type relayProxy struct { 200 TSDBProxy *httputil.ReverseProxy 201 BosunProxy *httputil.ReverseProxy 202} 203 204type passthru struct { 205 io.ReadCloser 206 buf bytes.Buffer 207} 208 209func (p *passthru) Read(b []byte) (int, error) { 210 n, err := p.ReadCloser.Read(b) 211 p.buf.Write(b[:n]) 212 return n, err 213} 214 215type relayWriter struct { 216 http.ResponseWriter 217 code int 218} 219 220func (rw *relayWriter) WriteHeader(code int) { 221 rw.code = code 222 rw.ResponseWriter.WriteHeader(code) 223} 224 225var ( 226 relayHeader = "X-Relayed-From" 227 encHeader = "Content-Encoding" 228 typeHeader = "Content-Type" 229 accessHeader = "X-Access-Token" 230 myHost string 231) 232 233func (rp *relayProxy) relayPut(responseWriter http.ResponseWriter, r *http.Request, parse bool) { 234 isRelayed := r.Header.Get(relayHeader) != "" 235 reader := &passthru{ReadCloser: r.Body} 236 r.Body = reader 237 w := &relayWriter{ResponseWriter: responseWriter} 238 rp.TSDBProxy.ServeHTTP(w, r) 239 if w.code/100 != 2 { 240 verbose("relayPut got status %d", w.code) 241 collect.Add("puts.error", tags, 1) 242 return 243 } 244 verbose("relayed to tsdb") 245 collect.Add("puts.relayed", tags, 1) 246 // Send to bosun in a separate go routine so we can end the source's request. 247 go func() { 248 body := bytes.NewBuffer(reader.buf.Bytes()) 249 req, err := http.NewRequest(r.Method, bosunIndexURL, body) 250 if err != nil { 251 verbose("bosun connect error: %v", err) 252 return 253 } 254 if access := r.Header.Get(accessHeader); access != "" { 255 req.Header.Set(accessHeader, access) 256 } 257 resp, err := http.DefaultClient.Do(req) 258 if err != nil { 259 verbose("bosun relay error: %v", err) 260 return 261 } 262 // Drain up to 512 bytes and close the body to let the Transport reuse the connection 263 io.CopyN(ioutil.Discard, resp.Body, 512) 264 resp.Body.Close() 265 verbose("bosun relay success") 266 }() 267 // Parse and denormalize datapoints 268 if !isRelayed && parse && denormalizationRules != nil { 269 go rp.denormalize(bytes.NewReader(reader.buf.Bytes())) 270 } 271 272 if !isRelayed && len(relayDataUrls) > 0 { 273 go func() { 274 for _, relayURL := range relayDataUrls { 275 body := bytes.NewBuffer(reader.buf.Bytes()) 276 req, err := http.NewRequest(r.Method, relayURL, body) 277 if err != nil { 278 verbose("%s connect error: %v", relayURL, err) 279 collect.Add("additional.puts.error", tags, 1) 280 continue 281 } 282 if contenttype := r.Header.Get(typeHeader); contenttype != "" { 283 req.Header.Set(typeHeader, contenttype) 284 } 285 if access := r.Header.Get(accessHeader); access != "" { 286 req.Header.Set(accessHeader, access) 287 } 288 if encoding := r.Header.Get(encHeader); encoding != "" { 289 req.Header.Set(encHeader, encoding) 290 } 291 req.Header.Add(relayHeader, myHost) 292 resp, err := http.DefaultClient.Do(req) 293 if err != nil { 294 verbose("secondary relay error: %v", err) 295 collect.Add("additional.puts.error", tags, 1) 296 continue 297 } 298 // Drain up to 512 bytes and close the body to let the Transport reuse the connection 299 io.CopyN(ioutil.Discard, resp.Body, 512) 300 resp.Body.Close() 301 verbose("secondary relay success") 302 collect.Add("additional.puts.relayed", tags, 1) 303 } 304 }() 305 } 306} 307 308func (rp *relayProxy) denormalize(body io.Reader) { 309 gReader, err := gzip.NewReader(body) 310 if err != nil { 311 verbose("error making gzip reader: %v", err) 312 return 313 } 314 decoder := json.NewDecoder(gReader) 315 dps := []*opentsdb.DataPoint{} 316 err = decoder.Decode(&dps) 317 if err != nil { 318 verbose("error decoding data points: %v", err) 319 return 320 } 321 relayDps := []*opentsdb.DataPoint{} 322 for _, dp := range dps { 323 if rule, ok := denormalizationRules[dp.Metric]; ok { 324 if err = rule.Translate(dp); err == nil { 325 relayDps = append(relayDps, dp) 326 } else { 327 verbose("error translating points: %v", err.Error()) 328 } 329 } 330 } 331 if len(relayDps) == 0 { 332 return 333 } 334 buf := &bytes.Buffer{} 335 gWriter := gzip.NewWriter(buf) 336 encoder := json.NewEncoder(gWriter) 337 err = encoder.Encode(relayDps) 338 if err != nil { 339 verbose("error encoding denormalized data points: %v", err) 340 return 341 } 342 if err = gWriter.Close(); err != nil { 343 verbose("error zipping denormalized data points: %v", err) 344 return 345 } 346 req, err := http.NewRequest("POST", tsdbPutURL, buf) 347 if err != nil { 348 verbose("error posting denormalized data points: %v", err) 349 return 350 } 351 req.Header.Set(typeHeader, "application/json") 352 req.Header.Set(encHeader, "gzip") 353 354 responseWriter := httptest.NewRecorder() 355 rp.relayPut(responseWriter, req, false) 356 357 verbose("relayed %d denormalized data points. Tsdb response: %d", len(relayDps), responseWriter.Code) 358} 359 360func (rp *relayProxy) relayMetadata(responseWriter http.ResponseWriter, r *http.Request) { 361 reader := &passthru{ReadCloser: r.Body} 362 r.Body = reader 363 w := &relayWriter{ResponseWriter: responseWriter} 364 rp.BosunProxy.ServeHTTP(w, r) 365 if w.code != 204 { 366 verbose("relayMetadata got status %d", w.code) 367 collect.Add("metadata.error", tags, 1) 368 return 369 } 370 verbose("relayed metadata to bosun") 371 collect.Add("metadata.relayed", tags, 1) 372 if r.Header.Get(relayHeader) != "" { 373 return 374 } 375 if len(relayMetadataUrls) != 0 { 376 go func() { 377 for _, relayURL := range relayMetadataUrls { 378 body := bytes.NewBuffer(reader.buf.Bytes()) 379 req, err := http.NewRequest(r.Method, relayURL, body) 380 if err != nil { 381 verbose("metadata %s error %v", relayURL, err) 382 continue 383 } 384 if contenttype := r.Header.Get(typeHeader); contenttype != "" { 385 req.Header.Set(typeHeader, contenttype) 386 } 387 if access := r.Header.Get(accessHeader); access != "" { 388 req.Header.Set(accessHeader, access) 389 } 390 if encoding := r.Header.Get(encHeader); encoding != "" { 391 req.Header.Set(encHeader, encoding) 392 } 393 req.Header.Add(relayHeader, myHost) 394 resp, err := http.DefaultClient.Do(req) 395 if err != nil { 396 verbose("secondary relay metadata error: %v", err) 397 continue 398 } 399 // Drain up to 512 bytes and close the body to let the Transport reuse the connection 400 io.CopyN(ioutil.Discard, resp.Body, 512) 401 resp.Body.Close() 402 verbose("secondary relay metadata success") 403 } 404 }() 405 } 406} 407 408// Parses a url of the form proto://host:port/path#fragment with the following rules: 409// proto:// is optional and will default to http:// if omitted 410// :port is optional and will use the default if omitted 411// /path is optional and will be ignored, will always be replaced by newpath 412// #fragment is optional and will be removed if removeFragment is true 413func parseHost(host string, newpath string, removeFragment bool) (*url.URL, error) { 414 if !strings.Contains(host, "//") { 415 host = "http://" + host 416 } 417 u, err := url.Parse(host) 418 if err != nil { 419 return nil, err 420 } 421 if u.Host == "" { 422 return nil, fmt.Errorf("no host specified") 423 } 424 u.Path = newpath 425 if removeFragment { 426 u.Fragment = "" 427 } 428 return u, nil 429} 430