1// Package graphite provides a service for InfluxDB to ingest data via the graphite protocol. 2package graphite // import "github.com/influxdata/influxdb/services/graphite" 3 4import ( 5 "bufio" 6 "fmt" 7 "math" 8 "net" 9 "strings" 10 "sync" 11 "sync/atomic" 12 "time" 13 14 "github.com/influxdata/influxdb/logger" 15 "github.com/influxdata/influxdb/models" 16 "github.com/influxdata/influxdb/monitor/diagnostics" 17 "github.com/influxdata/influxdb/services/meta" 18 "github.com/influxdata/influxdb/tsdb" 19 "go.uber.org/zap" 20) 21 22const udpBufferSize = 65536 23 24// statistics gathered by the graphite package. 25const ( 26 statPointsReceived = "pointsRx" 27 statBytesReceived = "bytesRx" 28 statPointsParseFail = "pointsParseFail" 29 statPointsNaNFail = "pointsNaNFail" 30 statBatchesTransmitted = "batchesTx" 31 statPointsTransmitted = "pointsTx" 32 statBatchesTransmitFail = "batchesTxFail" 33 statConnectionsActive = "connsActive" 34 statConnectionsHandled = "connsHandled" 35) 36 37type tcpConnection struct { 38 conn net.Conn 39 connectTime time.Time 40} 41 42func (c *tcpConnection) Close() { 43 c.conn.Close() 44} 45 46// Service represents a Graphite service. 47type Service struct { 48 bindAddress string 49 database string 50 retentionPolicy string 51 protocol string 52 batchSize int 53 batchPending int 54 batchTimeout time.Duration 55 udpReadBuffer int 56 57 batcher *tsdb.PointBatcher 58 parser *Parser 59 60 logger *zap.Logger 61 stats *Statistics 62 defaultTags models.StatisticTags 63 64 tcpConnectionsMu sync.Mutex 65 tcpConnections map[string]*tcpConnection 66 diagsKey string 67 68 ln net.Listener 69 addr net.Addr 70 udpConn *net.UDPConn 71 72 wg sync.WaitGroup 73 74 mu sync.RWMutex 75 ready bool // Has the required database been created? 76 done chan struct{} // Is the service closing or closed? 77 78 Monitor interface { 79 RegisterDiagnosticsClient(name string, client diagnostics.Client) 80 DeregisterDiagnosticsClient(name string) 81 } 82 PointsWriter interface { 83 WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error 84 } 85 MetaClient interface { 86 CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) 87 CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) 88 Database(name string) *meta.DatabaseInfo 89 RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) 90 } 91} 92 93// NewService returns an instance of the Graphite service. 94func NewService(c Config) (*Service, error) { 95 // Use defaults where necessary. 96 d := c.WithDefaults() 97 98 s := Service{ 99 bindAddress: d.BindAddress, 100 database: d.Database, 101 retentionPolicy: d.RetentionPolicy, 102 protocol: d.Protocol, 103 batchSize: d.BatchSize, 104 batchPending: d.BatchPending, 105 udpReadBuffer: d.UDPReadBuffer, 106 batchTimeout: time.Duration(d.BatchTimeout), 107 logger: zap.NewNop(), 108 stats: &Statistics{}, 109 defaultTags: models.StatisticTags{"proto": d.Protocol, "bind": d.BindAddress}, 110 tcpConnections: make(map[string]*tcpConnection), 111 diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"), 112 } 113 114 parser, err := NewParserWithOptions(Options{ 115 Templates: d.Templates, 116 DefaultTags: d.DefaultTags(), 117 Separator: d.Separator}) 118 119 if err != nil { 120 return nil, err 121 } 122 s.parser = parser 123 124 return &s, nil 125} 126 127// Open starts the Graphite input processing data. 128func (s *Service) Open() error { 129 s.mu.Lock() 130 defer s.mu.Unlock() 131 132 if s.done != nil { 133 return nil // Already open. 134 } 135 s.done = make(chan struct{}) 136 137 s.logger.Info("Starting graphite service", 138 zap.Int("batch_size", s.batchSize), 139 logger.DurationLiteral("batch_timeout", s.batchTimeout)) 140 141 // Register diagnostics if a Monitor service is available. 142 if s.Monitor != nil { 143 s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s) 144 } 145 146 s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout) 147 s.batcher.Start() 148 149 // Start processing batches. 150 s.wg.Add(1) 151 go s.processBatches(s.batcher) 152 153 var err error 154 if strings.ToLower(s.protocol) == "tcp" { 155 s.addr, err = s.openTCPServer() 156 } else if strings.ToLower(s.protocol) == "udp" { 157 s.addr, err = s.openUDPServer() 158 } else { 159 return fmt.Errorf("unrecognized Graphite input protocol %s", s.protocol) 160 } 161 if err != nil { 162 return err 163 } 164 165 s.logger.Info("Listening", 166 zap.String("protocol", s.protocol), 167 zap.Stringer("addr", s.addr)) 168 return nil 169} 170 171func (s *Service) closeAllConnections() { 172 s.tcpConnectionsMu.Lock() 173 defer s.tcpConnectionsMu.Unlock() 174 for _, c := range s.tcpConnections { 175 c.Close() 176 } 177} 178 179// Close stops all data processing on the Graphite input. 180func (s *Service) Close() error { 181 if wait := func() bool { 182 s.mu.Lock() 183 defer s.mu.Unlock() 184 185 if s.closed() { 186 return false 187 } 188 close(s.done) 189 190 s.closeAllConnections() 191 192 if s.ln != nil { 193 s.ln.Close() 194 } 195 if s.udpConn != nil { 196 s.udpConn.Close() 197 } 198 199 if s.batcher != nil { 200 s.batcher.Stop() 201 } 202 203 if s.Monitor != nil { 204 s.Monitor.DeregisterDiagnosticsClient(s.diagsKey) 205 } 206 return true 207 }(); !wait { 208 return nil // Already closed. 209 } 210 211 s.wg.Wait() 212 213 s.mu.Lock() 214 s.done = nil 215 s.mu.Unlock() 216 217 return nil 218} 219 220// Closed returns true if the service is currently closed. 221func (s *Service) Closed() bool { 222 s.mu.Lock() 223 defer s.mu.Unlock() 224 return s.closed() 225} 226 227func (s *Service) closed() bool { 228 select { 229 case <-s.done: 230 // Service is closing. 231 return true 232 default: 233 } 234 return s.done == nil 235} 236 237// createInternalStorage ensures that the required database has been created. 238func (s *Service) createInternalStorage() error { 239 s.mu.RLock() 240 ready := s.ready 241 s.mu.RUnlock() 242 if ready { 243 return nil 244 } 245 246 if db := s.MetaClient.Database(s.database); db != nil { 247 if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil { 248 spec := meta.RetentionPolicySpec{Name: s.retentionPolicy} 249 if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec, true); err != nil { 250 return err 251 } 252 } 253 } else { 254 spec := meta.RetentionPolicySpec{Name: s.retentionPolicy} 255 if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil { 256 return err 257 } 258 } 259 260 // The service is now ready. 261 s.mu.Lock() 262 s.ready = true 263 s.mu.Unlock() 264 return nil 265} 266 267// WithLogger sets the logger on the service. 268func (s *Service) WithLogger(log *zap.Logger) { 269 s.logger = log.With( 270 zap.String("service", "graphite"), 271 zap.String("addr", s.bindAddress), 272 ) 273} 274 275// Statistics maintains statistics for the graphite service. 276type Statistics struct { 277 PointsReceived int64 278 BytesReceived int64 279 PointsParseFail int64 280 PointsNaNFail int64 281 BatchesTransmitted int64 282 PointsTransmitted int64 283 BatchesTransmitFail int64 284 ActiveConnections int64 285 HandledConnections int64 286} 287 288// Statistics returns statistics for periodic monitoring. 289func (s *Service) Statistics(tags map[string]string) []models.Statistic { 290 return []models.Statistic{{ 291 Name: "graphite", 292 Tags: s.defaultTags.Merge(tags), 293 Values: map[string]interface{}{ 294 statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived), 295 statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived), 296 statPointsParseFail: atomic.LoadInt64(&s.stats.PointsParseFail), 297 statPointsNaNFail: atomic.LoadInt64(&s.stats.PointsNaNFail), 298 statBatchesTransmitted: atomic.LoadInt64(&s.stats.BatchesTransmitted), 299 statPointsTransmitted: atomic.LoadInt64(&s.stats.PointsTransmitted), 300 statBatchesTransmitFail: atomic.LoadInt64(&s.stats.BatchesTransmitFail), 301 statConnectionsActive: atomic.LoadInt64(&s.stats.ActiveConnections), 302 statConnectionsHandled: atomic.LoadInt64(&s.stats.HandledConnections), 303 }, 304 }} 305} 306 307// Addr returns the address the Service binds to. 308func (s *Service) Addr() net.Addr { 309 return s.addr 310} 311 312// openTCPServer opens the Graphite input in TCP mode and starts processing data. 313func (s *Service) openTCPServer() (net.Addr, error) { 314 ln, err := net.Listen("tcp", s.bindAddress) 315 if err != nil { 316 return nil, err 317 } 318 s.ln = ln 319 320 s.wg.Add(1) 321 go func() { 322 defer s.wg.Done() 323 for { 324 conn, err := s.ln.Accept() 325 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { 326 s.logger.Info("Graphite TCP listener closed") 327 return 328 } 329 if err != nil { 330 s.logger.Info("Error accepting TCP connection", zap.Error(err)) 331 continue 332 } 333 334 s.wg.Add(1) 335 go s.handleTCPConnection(conn) 336 } 337 }() 338 return ln.Addr(), nil 339} 340 341// handleTCPConnection services an individual TCP connection for the Graphite input. 342func (s *Service) handleTCPConnection(conn net.Conn) { 343 defer s.wg.Done() 344 defer conn.Close() 345 defer atomic.AddInt64(&s.stats.ActiveConnections, -1) 346 defer s.untrackConnection(conn) 347 atomic.AddInt64(&s.stats.ActiveConnections, 1) 348 atomic.AddInt64(&s.stats.HandledConnections, 1) 349 s.trackConnection(conn) 350 351 reader := bufio.NewReader(conn) 352 353 for { 354 // Read up to the next newline. 355 buf, err := reader.ReadBytes('\n') 356 if err != nil { 357 return 358 } 359 360 // Trim the buffer, even though there should be no padding 361 line := strings.TrimSpace(string(buf)) 362 363 atomic.AddInt64(&s.stats.PointsReceived, 1) 364 atomic.AddInt64(&s.stats.BytesReceived, int64(len(buf))) 365 s.handleLine(line) 366 } 367} 368 369func (s *Service) trackConnection(c net.Conn) { 370 s.tcpConnectionsMu.Lock() 371 defer s.tcpConnectionsMu.Unlock() 372 s.tcpConnections[c.RemoteAddr().String()] = &tcpConnection{ 373 conn: c, 374 connectTime: time.Now().UTC(), 375 } 376} 377func (s *Service) untrackConnection(c net.Conn) { 378 s.tcpConnectionsMu.Lock() 379 defer s.tcpConnectionsMu.Unlock() 380 delete(s.tcpConnections, c.RemoteAddr().String()) 381} 382 383// openUDPServer opens the Graphite input in UDP mode and starts processing incoming data. 384func (s *Service) openUDPServer() (net.Addr, error) { 385 addr, err := net.ResolveUDPAddr("udp", s.bindAddress) 386 if err != nil { 387 return nil, err 388 } 389 390 s.udpConn, err = net.ListenUDP("udp", addr) 391 if err != nil { 392 return nil, err 393 } 394 395 if s.udpReadBuffer != 0 { 396 err = s.udpConn.SetReadBuffer(s.udpReadBuffer) 397 if err != nil { 398 return nil, fmt.Errorf("unable to set UDP read buffer to %d: %s", 399 s.udpReadBuffer, err) 400 } 401 } 402 403 buf := make([]byte, udpBufferSize) 404 s.wg.Add(1) 405 go func() { 406 defer s.wg.Done() 407 for { 408 n, _, err := s.udpConn.ReadFromUDP(buf) 409 if err != nil { 410 s.udpConn.Close() 411 return 412 } 413 414 lines := strings.Split(string(buf[:n]), "\n") 415 for _, line := range lines { 416 s.handleLine(line) 417 } 418 atomic.AddInt64(&s.stats.PointsReceived, int64(len(lines))) 419 atomic.AddInt64(&s.stats.BytesReceived, int64(n)) 420 } 421 }() 422 return s.udpConn.LocalAddr(), nil 423} 424 425func (s *Service) handleLine(line string) { 426 if line == "" { 427 return 428 } 429 430 // Parse it. 431 point, err := s.parser.Parse(line) 432 if err != nil { 433 switch err := err.(type) { 434 case *UnsupportedValueError: 435 // Graphite ignores NaN values with no error. 436 if math.IsNaN(err.Value) { 437 atomic.AddInt64(&s.stats.PointsNaNFail, 1) 438 return 439 } 440 } 441 s.logger.Info("Unable to parse line", zap.String("line", line), zap.Error(err)) 442 atomic.AddInt64(&s.stats.PointsParseFail, 1) 443 return 444 } 445 446 s.batcher.In() <- point 447} 448 449// processBatches continually drains the given batcher and writes the batches to the database. 450func (s *Service) processBatches(batcher *tsdb.PointBatcher) { 451 defer s.wg.Done() 452 for { 453 select { 454 case batch := <-batcher.Out(): 455 // Will attempt to create database if not yet created. 456 if err := s.createInternalStorage(); err != nil { 457 s.logger.Info("Required database or retention policy do not yet exist", zap.Error(err)) 458 continue 459 } 460 461 if err := s.PointsWriter.WritePointsPrivileged(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil { 462 atomic.AddInt64(&s.stats.BatchesTransmitted, 1) 463 atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) 464 } else { 465 s.logger.Info("Failed to write point batch to database", 466 logger.Database(s.database), zap.Error(err)) 467 atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) 468 } 469 470 case <-s.done: 471 return 472 } 473 } 474} 475 476// Diagnostics returns diagnostics of the graphite service. 477func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error) { 478 s.tcpConnectionsMu.Lock() 479 defer s.tcpConnectionsMu.Unlock() 480 481 d := &diagnostics.Diagnostics{ 482 Columns: []string{"local", "remote", "connect time"}, 483 Rows: make([][]interface{}, 0, len(s.tcpConnections)), 484 } 485 for _, v := range s.tcpConnections { 486 d.Rows = append(d.Rows, []interface{}{v.conn.LocalAddr().String(), v.conn.RemoteAddr().String(), v.connectTime}) 487 } 488 return d, nil 489} 490