1// Package opentsdb provides a service for InfluxDB to ingest data via the opentsdb protocol. 2package opentsdb // import "github.com/influxdata/influxdb/services/opentsdb" 3 4import ( 5 "bufio" 6 "bytes" 7 "crypto/tls" 8 "io" 9 "net" 10 "net/http" 11 "net/textproto" 12 "strconv" 13 "strings" 14 "sync" 15 "sync/atomic" 16 "time" 17 18 "github.com/influxdata/influxdb/logger" 19 "github.com/influxdata/influxdb/models" 20 "github.com/influxdata/influxdb/services/meta" 21 "github.com/influxdata/influxdb/tsdb" 22 "go.uber.org/zap" 23) 24 25// statistics gathered by the openTSDB package. 26const ( 27 statHTTPConnectionsHandled = "httpConnsHandled" 28 statTelnetConnectionsActive = "tlConnsActive" 29 statTelnetConnectionsHandled = "tlConnsHandled" 30 statTelnetPointsReceived = "tlPointsRx" 31 statTelnetBytesReceived = "tlBytesRx" 32 statTelnetReadError = "tlReadErr" 33 statTelnetBadLine = "tlBadLine" 34 statTelnetBadTime = "tlBadTime" 35 statTelnetBadTag = "tlBadTag" 36 statTelnetBadFloat = "tlBadFloat" 37 statBatchesTransmitted = "batchesTx" 38 statPointsTransmitted = "pointsTx" 39 statBatchesTransmitFail = "batchesTxFail" 40 statConnectionsActive = "connsActive" 41 statConnectionsHandled = "connsHandled" 42 statDroppedPointsInvalid = "droppedPointsInvalid" 43) 44 45// Service manages the listener and handler for an HTTP endpoint. 46type Service struct { 47 ln net.Listener // main listener 48 httpln *chanListener // http channel-based listener 49 50 wg sync.WaitGroup 51 tls bool 52 tlsConfig *tls.Config 53 cert string 54 55 mu sync.RWMutex 56 ready bool // Has the required database been created? 57 done chan struct{} // Is the service closing or closed? 58 59 BindAddress string 60 Database string 61 RetentionPolicy string 62 63 PointsWriter interface { 64 WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error 65 } 66 MetaClient interface { 67 CreateDatabase(name string) (*meta.DatabaseInfo, error) 68 } 69 70 // Points received over the telnet protocol are batched. 71 batchSize int 72 batchPending int 73 batchTimeout time.Duration 74 batcher *tsdb.PointBatcher 75 76 LogPointErrors bool 77 Logger *zap.Logger 78 79 stats *Statistics 80 defaultTags models.StatisticTags 81} 82 83// NewService returns a new instance of Service. 84func NewService(c Config) (*Service, error) { 85 // Use defaults where necessary. 86 d := c.WithDefaults() 87 88 s := &Service{ 89 tls: d.TLSEnabled, 90 tlsConfig: d.TLS, 91 cert: d.Certificate, 92 BindAddress: d.BindAddress, 93 Database: d.Database, 94 RetentionPolicy: d.RetentionPolicy, 95 batchSize: d.BatchSize, 96 batchPending: d.BatchPending, 97 batchTimeout: time.Duration(d.BatchTimeout), 98 Logger: zap.NewNop(), 99 LogPointErrors: d.LogPointErrors, 100 stats: &Statistics{}, 101 defaultTags: models.StatisticTags{"bind": d.BindAddress}, 102 } 103 if s.tlsConfig == nil { 104 s.tlsConfig = new(tls.Config) 105 } 106 107 return s, nil 108} 109 110// Open starts the service. 111func (s *Service) Open() error { 112 s.mu.Lock() 113 defer s.mu.Unlock() 114 115 if s.done != nil { 116 return nil // Already open. 117 } 118 s.done = make(chan struct{}) 119 120 s.Logger.Info("Starting OpenTSDB service") 121 122 s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout) 123 s.batcher.Start() 124 125 // Start processing batches. 126 s.wg.Add(1) 127 go func() { defer s.wg.Done(); s.processBatches(s.batcher) }() 128 129 // Open listener. 130 if s.tls { 131 cert, err := tls.LoadX509KeyPair(s.cert, s.cert) 132 if err != nil { 133 return err 134 } 135 136 tlsConfig := s.tlsConfig.Clone() 137 tlsConfig.Certificates = []tls.Certificate{cert} 138 139 listener, err := tls.Listen("tcp", s.BindAddress, tlsConfig) 140 if err != nil { 141 return err 142 } 143 144 s.ln = listener 145 } else { 146 listener, err := net.Listen("tcp", s.BindAddress) 147 if err != nil { 148 return err 149 } 150 151 s.ln = listener 152 } 153 s.Logger.Info("Listening on TCP", 154 zap.Stringer("addr", s.ln.Addr()), 155 zap.Bool("tls", s.tls)) 156 s.httpln = newChanListener(s.ln.Addr()) 157 158 // Begin listening for connections. 159 s.wg.Add(2) 160 go func() { defer s.wg.Done(); s.serve() }() 161 go func() { defer s.wg.Done(); s.serveHTTP() }() 162 163 return nil 164} 165 166// Close closes the openTSDB service. 167func (s *Service) Close() error { 168 if wait, err := func() (bool, error) { 169 s.mu.Lock() 170 defer s.mu.Unlock() 171 172 if s.closed() { 173 return false, nil // Already closed. 174 } 175 close(s.done) 176 177 // Close the listeners. 178 if err := s.ln.Close(); err != nil { 179 return false, err 180 } 181 if err := s.httpln.Close(); err != nil { 182 return false, err 183 } 184 185 if s.batcher != nil { 186 s.batcher.Stop() 187 } 188 return true, nil 189 }(); err != nil { 190 return err 191 } else if !wait { 192 return nil 193 } 194 s.wg.Wait() 195 196 s.mu.Lock() 197 s.done = nil 198 s.mu.Unlock() 199 200 return nil 201} 202 203// Closed returns true if the service is currently closed. 204func (s *Service) Closed() bool { 205 s.mu.Lock() 206 defer s.mu.Unlock() 207 return s.closed() 208} 209 210func (s *Service) closed() bool { 211 select { 212 case <-s.done: 213 // Service is closing. 214 return true 215 default: 216 return s.done == nil 217 } 218} 219 220// createInternalStorage ensures that the required database has been created. 221func (s *Service) createInternalStorage() error { 222 s.mu.RLock() 223 ready := s.ready 224 s.mu.RUnlock() 225 if ready { 226 return nil 227 } 228 229 if _, err := s.MetaClient.CreateDatabase(s.Database); err != nil { 230 return err 231 } 232 233 // The service is now ready. 234 s.mu.Lock() 235 s.ready = true 236 s.mu.Unlock() 237 return nil 238} 239 240// WithLogger sets the logger for the service. 241func (s *Service) WithLogger(log *zap.Logger) { 242 s.Logger = log.With(zap.String("service", "opentsdb")) 243} 244 245// Statistics maintains statistics for the subscriber service. 246type Statistics struct { 247 HTTPConnectionsHandled int64 248 ActiveTelnetConnections int64 249 HandledTelnetConnections int64 250 TelnetPointsReceived int64 251 TelnetBytesReceived int64 252 TelnetReadError int64 253 TelnetBadLine int64 254 TelnetBadTime int64 255 TelnetBadTag int64 256 TelnetBadFloat int64 257 BatchesTransmitted int64 258 PointsTransmitted int64 259 BatchesTransmitFail int64 260 ActiveConnections int64 261 HandledConnections int64 262 InvalidDroppedPoints int64 263} 264 265// Statistics returns statistics for periodic monitoring. 266func (s *Service) Statistics(tags map[string]string) []models.Statistic { 267 return []models.Statistic{{ 268 Name: "opentsdb", 269 Tags: s.defaultTags.Merge(tags), 270 Values: map[string]interface{}{ 271 statHTTPConnectionsHandled: atomic.LoadInt64(&s.stats.HTTPConnectionsHandled), 272 statTelnetConnectionsActive: atomic.LoadInt64(&s.stats.ActiveTelnetConnections), 273 statTelnetConnectionsHandled: atomic.LoadInt64(&s.stats.HandledTelnetConnections), 274 statTelnetPointsReceived: atomic.LoadInt64(&s.stats.TelnetPointsReceived), 275 statTelnetBytesReceived: atomic.LoadInt64(&s.stats.TelnetBytesReceived), 276 statTelnetReadError: atomic.LoadInt64(&s.stats.TelnetReadError), 277 statTelnetBadLine: atomic.LoadInt64(&s.stats.TelnetBadLine), 278 statTelnetBadTime: atomic.LoadInt64(&s.stats.TelnetBadTime), 279 statTelnetBadTag: atomic.LoadInt64(&s.stats.TelnetBadTag), 280 statTelnetBadFloat: atomic.LoadInt64(&s.stats.TelnetBadFloat), 281 statBatchesTransmitted: atomic.LoadInt64(&s.stats.BatchesTransmitted), 282 statPointsTransmitted: atomic.LoadInt64(&s.stats.PointsTransmitted), 283 statBatchesTransmitFail: atomic.LoadInt64(&s.stats.BatchesTransmitFail), 284 statConnectionsActive: atomic.LoadInt64(&s.stats.ActiveConnections), 285 statConnectionsHandled: atomic.LoadInt64(&s.stats.HandledConnections), 286 statDroppedPointsInvalid: atomic.LoadInt64(&s.stats.InvalidDroppedPoints), 287 }, 288 }} 289} 290 291// Addr returns the listener's address. Returns nil if listener is closed. 292func (s *Service) Addr() net.Addr { 293 if s.ln == nil { 294 return nil 295 } 296 return s.ln.Addr() 297} 298 299// serve serves the handler from the listener. 300func (s *Service) serve() { 301 for { 302 // Wait for next connection. 303 conn, err := s.ln.Accept() 304 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { 305 s.Logger.Info("OpenTSDB TCP listener closed") 306 return 307 } else if err != nil { 308 s.Logger.Info("Error accepting OpenTSDB", zap.Error(err)) 309 continue 310 } 311 312 // Handle connection in separate goroutine. 313 go s.handleConn(conn) 314 } 315} 316 317// handleConn processes conn. This is run in a separate goroutine. 318func (s *Service) handleConn(conn net.Conn) { 319 defer atomic.AddInt64(&s.stats.ActiveConnections, -1) 320 atomic.AddInt64(&s.stats.ActiveConnections, 1) 321 atomic.AddInt64(&s.stats.HandledConnections, 1) 322 323 // Read header into buffer to check if it's HTTP. 324 var buf bytes.Buffer 325 r := bufio.NewReader(io.TeeReader(conn, &buf)) 326 327 // Attempt to parse connection as HTTP. 328 _, err := http.ReadRequest(r) 329 330 // Rebuild connection from buffer and remaining connection data. 331 bufr := bufio.NewReader(io.MultiReader(&buf, conn)) 332 conn = &readerConn{Conn: conn, r: bufr} 333 334 // If no HTTP parsing error occurred then process as HTTP. 335 if err == nil { 336 atomic.AddInt64(&s.stats.HTTPConnectionsHandled, 1) 337 s.httpln.ch <- conn 338 return 339 } 340 341 // Otherwise handle in telnet format. 342 s.wg.Add(1) 343 s.handleTelnetConn(conn) 344 s.wg.Done() 345} 346 347// handleTelnetConn accepts OpenTSDB's telnet protocol. 348// Each telnet command consists of a line of the form: 349// put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0 350func (s *Service) handleTelnetConn(conn net.Conn) { 351 defer conn.Close() 352 defer atomic.AddInt64(&s.stats.ActiveTelnetConnections, -1) 353 atomic.AddInt64(&s.stats.ActiveTelnetConnections, 1) 354 atomic.AddInt64(&s.stats.HandledTelnetConnections, 1) 355 356 // Get connection details. 357 remoteAddr := conn.RemoteAddr().String() 358 359 // Wrap connection in a text protocol reader. 360 r := textproto.NewReader(bufio.NewReader(conn)) 361 for { 362 line, err := r.ReadLine() 363 if err != nil { 364 if err != io.EOF { 365 atomic.AddInt64(&s.stats.TelnetReadError, 1) 366 s.Logger.Info("Error reading from OpenTSDB connection", zap.Error(err)) 367 } 368 return 369 } 370 atomic.AddInt64(&s.stats.TelnetPointsReceived, 1) 371 atomic.AddInt64(&s.stats.TelnetBytesReceived, int64(len(line))) 372 373 inputStrs := strings.Fields(line) 374 375 if len(inputStrs) == 1 && inputStrs[0] == "version" { 376 conn.Write([]byte("InfluxDB TSDB proxy")) 377 continue 378 } 379 380 if len(inputStrs) < 4 || inputStrs[0] != "put" { 381 atomic.AddInt64(&s.stats.TelnetBadLine, 1) 382 if s.LogPointErrors { 383 s.Logger.Info("Malformed line", zap.String("line", line), zap.String("remote_addr", remoteAddr)) 384 } 385 continue 386 } 387 388 measurement := inputStrs[1] 389 tsStr := inputStrs[2] 390 valueStr := inputStrs[3] 391 tagStrs := inputStrs[4:] 392 393 var t time.Time 394 ts, err := strconv.ParseInt(tsStr, 10, 64) 395 if err != nil { 396 atomic.AddInt64(&s.stats.TelnetBadTime, 1) 397 if s.LogPointErrors { 398 s.Logger.Info("Malformed time", zap.String("time", tsStr), zap.String("remote_addr", remoteAddr)) 399 } 400 } 401 402 switch len(tsStr) { 403 case 10: 404 t = time.Unix(ts, 0) 405 case 13: 406 t = time.Unix(ts/1000, (ts%1000)*1000) 407 default: 408 atomic.AddInt64(&s.stats.TelnetBadTime, 1) 409 if s.LogPointErrors { 410 s.Logger.Info("Time must be 10 or 13 chars", zap.String("time", tsStr), zap.String("remote_addr", remoteAddr)) 411 } 412 continue 413 } 414 415 tags := make(map[string]string) 416 for t := range tagStrs { 417 parts := strings.SplitN(tagStrs[t], "=", 2) 418 if len(parts) != 2 || parts[0] == "" || parts[1] == "" { 419 atomic.AddInt64(&s.stats.TelnetBadTag, 1) 420 if s.LogPointErrors { 421 s.Logger.Info("Malformed tag data", zap.String("tag", tagStrs[t]), zap.String("remote_addr", remoteAddr)) 422 } 423 continue 424 } 425 k := parts[0] 426 427 tags[k] = parts[1] 428 } 429 430 fields := make(map[string]interface{}) 431 fv, err := strconv.ParseFloat(valueStr, 64) 432 if err != nil { 433 atomic.AddInt64(&s.stats.TelnetBadFloat, 1) 434 if s.LogPointErrors { 435 s.Logger.Info("Bad float", zap.String("value", valueStr), zap.String("remote_addr", remoteAddr)) 436 } 437 continue 438 } 439 fields["value"] = fv 440 441 pt, err := models.NewPoint(measurement, models.NewTags(tags), fields, t) 442 if err != nil { 443 atomic.AddInt64(&s.stats.TelnetBadFloat, 1) 444 if s.LogPointErrors { 445 s.Logger.Info("Bad float", zap.String("value", valueStr), zap.String("remote_addr", remoteAddr)) 446 } 447 continue 448 } 449 s.batcher.In() <- pt 450 } 451} 452 453// serveHTTP handles connections in HTTP format. 454func (s *Service) serveHTTP() { 455 handler := &Handler{ 456 Database: s.Database, 457 RetentionPolicy: s.RetentionPolicy, 458 PointsWriter: s.PointsWriter, 459 Logger: s.Logger, 460 stats: s.stats, 461 } 462 srv := &http.Server{Handler: handler} 463 srv.Serve(s.httpln) 464} 465 466// processBatches continually drains the given batcher and writes the batches to the database. 467func (s *Service) processBatches(batcher *tsdb.PointBatcher) { 468 for { 469 select { 470 case <-s.done: 471 return 472 case batch := <-batcher.Out(): 473 // Will attempt to create database if not yet created. 474 if err := s.createInternalStorage(); err != nil { 475 s.Logger.Info("Required database does not yet exist", logger.Database(s.Database), zap.Error(err)) 476 continue 477 } 478 479 if err := s.PointsWriter.WritePointsPrivileged(s.Database, s.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { 480 atomic.AddInt64(&s.stats.BatchesTransmitted, 1) 481 atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) 482 } else { 483 s.Logger.Info("Failed to write point batch to database", 484 logger.Database(s.Database), zap.Error(err)) 485 atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) 486 } 487 } 488 } 489} 490