1package run 2 3import ( 4 "crypto/tls" 5 "fmt" 6 "io" 7 "log" 8 "net" 9 "os" 10 "path/filepath" 11 "runtime" 12 "runtime/pprof" 13 "time" 14 15 "github.com/influxdata/influxdb" 16 "github.com/influxdata/influxdb/coordinator" 17 "github.com/influxdata/influxdb/flux/control" 18 "github.com/influxdata/influxdb/logger" 19 "github.com/influxdata/influxdb/models" 20 "github.com/influxdata/influxdb/monitor" 21 "github.com/influxdata/influxdb/query" 22 "github.com/influxdata/influxdb/services/collectd" 23 "github.com/influxdata/influxdb/services/continuous_querier" 24 "github.com/influxdata/influxdb/services/graphite" 25 "github.com/influxdata/influxdb/services/httpd" 26 "github.com/influxdata/influxdb/services/meta" 27 "github.com/influxdata/influxdb/services/opentsdb" 28 "github.com/influxdata/influxdb/services/precreator" 29 "github.com/influxdata/influxdb/services/retention" 30 "github.com/influxdata/influxdb/services/snapshotter" 31 "github.com/influxdata/influxdb/services/storage" 32 "github.com/influxdata/influxdb/services/subscriber" 33 "github.com/influxdata/influxdb/services/udp" 34 "github.com/influxdata/influxdb/storage/reads" 35 "github.com/influxdata/influxdb/tcp" 36 "github.com/influxdata/influxdb/tsdb" 37 client "github.com/influxdata/usage-client/v1" 38 "go.uber.org/zap" 39 40 // Initialize the engine package 41 _ "github.com/influxdata/influxdb/tsdb/engine" 42 // Initialize the index package 43 _ "github.com/influxdata/influxdb/tsdb/index" 44) 45 46var startTime time.Time 47 48func init() { 49 startTime = time.Now().UTC() 50} 51 52// BuildInfo represents the build details for the server code. 53type BuildInfo struct { 54 Version string 55 Commit string 56 Branch string 57 Time string 58} 59 60// Server represents a container for the metadata and storage data and services. 61// It is built using a Config and it manages the startup and shutdown of all 62// services in the proper order. 63type Server struct { 64 buildInfo BuildInfo 65 66 err chan error 67 closing chan struct{} 68 69 BindAddress string 70 Listener net.Listener 71 72 Logger *zap.Logger 73 74 MetaClient *meta.Client 75 76 TSDBStore *tsdb.Store 77 QueryExecutor *query.Executor 78 PointsWriter *coordinator.PointsWriter 79 Subscriber *subscriber.Service 80 81 Services []Service 82 83 // These references are required for the tcp muxer. 84 SnapshotterService *snapshotter.Service 85 86 Monitor *monitor.Monitor 87 88 // Server reporting and registration 89 reportingDisabled bool 90 91 // Profiling 92 CPUProfile string 93 MemProfile string 94 95 // httpAPIAddr is the host:port combination for the main HTTP API for querying and writing data 96 httpAPIAddr string 97 98 // httpUseTLS specifies if we should use a TLS connection to the http servers 99 httpUseTLS bool 100 101 // tcpAddr is the host:port combination for the TCP listener that services mux onto 102 tcpAddr string 103 104 config *Config 105} 106 107// updateTLSConfig stores with into the tls config pointed at by into but only if with is not nil 108// and into is nil. Think of it as setting the default value. 109func updateTLSConfig(into **tls.Config, with *tls.Config) { 110 if with != nil && into != nil && *into == nil { 111 *into = with 112 } 113} 114 115// NewServer returns a new instance of Server built from a config. 116func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { 117 // First grab the base tls config we will use for all clients and servers 118 tlsConfig, err := c.TLS.Parse() 119 if err != nil { 120 return nil, fmt.Errorf("tls configuration: %v", err) 121 } 122 123 // Update the TLS values on each of the configs to be the parsed one if 124 // not already specified (set the default). 125 updateTLSConfig(&c.HTTPD.TLS, tlsConfig) 126 updateTLSConfig(&c.Subscriber.TLS, tlsConfig) 127 for i := range c.OpenTSDBInputs { 128 updateTLSConfig(&c.OpenTSDBInputs[i].TLS, tlsConfig) 129 } 130 131 // We need to ensure that a meta directory always exists even if 132 // we don't start the meta store. node.json is always stored under 133 // the meta directory. 134 if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil { 135 return nil, fmt.Errorf("mkdir all: %s", err) 136 } 137 138 // 0.10-rc1 and prior would sometimes put the node.json at the root 139 // dir which breaks backup/restore and restarting nodes. This moves 140 // the file from the root so it's always under the meta dir. 141 oldPath := filepath.Join(filepath.Dir(c.Meta.Dir), "node.json") 142 newPath := filepath.Join(c.Meta.Dir, "node.json") 143 144 if _, err := os.Stat(oldPath); err == nil { 145 if err := os.Rename(oldPath, newPath); err != nil { 146 return nil, err 147 } 148 } 149 150 _, err = influxdb.LoadNode(c.Meta.Dir) 151 if err != nil { 152 if !os.IsNotExist(err) { 153 return nil, err 154 } 155 } 156 157 if err := raftDBExists(c.Meta.Dir); err != nil { 158 return nil, err 159 } 160 161 // In 0.10.0 bind-address got moved to the top level. Check 162 // The old location to keep things backwards compatible 163 bind := c.BindAddress 164 165 s := &Server{ 166 buildInfo: *buildInfo, 167 err: make(chan error), 168 closing: make(chan struct{}), 169 170 BindAddress: bind, 171 172 Logger: logger.New(os.Stderr), 173 174 MetaClient: meta.NewClient(c.Meta), 175 176 reportingDisabled: c.ReportingDisabled, 177 178 httpAPIAddr: c.HTTPD.BindAddress, 179 httpUseTLS: c.HTTPD.HTTPSEnabled, 180 tcpAddr: bind, 181 182 config: c, 183 } 184 s.Monitor = monitor.New(s, c.Monitor) 185 s.config.registerDiagnostics(s.Monitor) 186 187 if err := s.MetaClient.Open(); err != nil { 188 return nil, err 189 } 190 191 s.TSDBStore = tsdb.NewStore(c.Data.Dir) 192 s.TSDBStore.EngineOptions.Config = c.Data 193 194 // Copy TSDB configuration. 195 s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine 196 s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index 197 198 // Create the Subscriber service 199 s.Subscriber = subscriber.NewService(c.Subscriber) 200 201 // Initialize points writer. 202 s.PointsWriter = coordinator.NewPointsWriter() 203 s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout) 204 s.PointsWriter.TSDBStore = s.TSDBStore 205 206 // Initialize query executor. 207 s.QueryExecutor = query.NewExecutor() 208 s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{ 209 MetaClient: s.MetaClient, 210 TaskManager: s.QueryExecutor.TaskManager, 211 TSDBStore: s.TSDBStore, 212 ShardMapper: &coordinator.LocalShardMapper{ 213 MetaClient: s.MetaClient, 214 TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore}, 215 }, 216 Monitor: s.Monitor, 217 PointsWriter: s.PointsWriter, 218 MaxSelectPointN: c.Coordinator.MaxSelectPointN, 219 MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN, 220 MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN, 221 } 222 s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout) 223 s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter) 224 s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries 225 226 // Initialize the monitor 227 s.Monitor.Version = s.buildInfo.Version 228 s.Monitor.Commit = s.buildInfo.Commit 229 s.Monitor.Branch = s.buildInfo.Branch 230 s.Monitor.BuildTime = s.buildInfo.Time 231 s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter) 232 return s, nil 233} 234 235// Statistics returns statistics for the services running in the Server. 236func (s *Server) Statistics(tags map[string]string) []models.Statistic { 237 var statistics []models.Statistic 238 statistics = append(statistics, s.QueryExecutor.Statistics(tags)...) 239 statistics = append(statistics, s.TSDBStore.Statistics(tags)...) 240 statistics = append(statistics, s.PointsWriter.Statistics(tags)...) 241 statistics = append(statistics, s.Subscriber.Statistics(tags)...) 242 for _, srv := range s.Services { 243 if m, ok := srv.(monitor.Reporter); ok { 244 statistics = append(statistics, m.Statistics(tags)...) 245 } 246 } 247 return statistics 248} 249 250func (s *Server) appendSnapshotterService() { 251 srv := snapshotter.NewService() 252 srv.TSDBStore = s.TSDBStore 253 srv.MetaClient = s.MetaClient 254 s.Services = append(s.Services, srv) 255 s.SnapshotterService = srv 256} 257 258// SetLogOutput sets the logger used for all messages. It must not be called 259// after the Open method has been called. 260func (s *Server) SetLogOutput(w io.Writer) { 261 s.Logger = logger.New(w) 262} 263 264func (s *Server) appendMonitorService() { 265 s.Services = append(s.Services, s.Monitor) 266} 267 268func (s *Server) appendRetentionPolicyService(c retention.Config) { 269 if !c.Enabled { 270 return 271 } 272 srv := retention.NewService(c) 273 srv.MetaClient = s.MetaClient 274 srv.TSDBStore = s.TSDBStore 275 s.Services = append(s.Services, srv) 276} 277 278func (s *Server) appendHTTPDService(c httpd.Config) { 279 if !c.Enabled { 280 return 281 } 282 srv := httpd.NewService(c) 283 srv.Handler.MetaClient = s.MetaClient 284 authorizer := meta.NewQueryAuthorizer(s.MetaClient) 285 srv.Handler.QueryAuthorizer = authorizer 286 srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient) 287 srv.Handler.QueryExecutor = s.QueryExecutor 288 srv.Handler.Monitor = s.Monitor 289 srv.Handler.PointsWriter = s.PointsWriter 290 srv.Handler.Version = s.buildInfo.Version 291 srv.Handler.BuildType = "OSS" 292 ss := storage.NewStore(s.TSDBStore, s.MetaClient) 293 srv.Handler.Store = ss 294 srv.Handler.Controller = control.NewController(s.MetaClient, reads.NewReader(ss), authorizer, c.AuthEnabled, s.Logger) 295 296 s.Services = append(s.Services, srv) 297} 298 299func (s *Server) appendCollectdService(c collectd.Config) { 300 if !c.Enabled { 301 return 302 } 303 srv := collectd.NewService(c) 304 srv.MetaClient = s.MetaClient 305 srv.PointsWriter = s.PointsWriter 306 s.Services = append(s.Services, srv) 307} 308 309func (s *Server) appendOpenTSDBService(c opentsdb.Config) error { 310 if !c.Enabled { 311 return nil 312 } 313 srv, err := opentsdb.NewService(c) 314 if err != nil { 315 return err 316 } 317 srv.PointsWriter = s.PointsWriter 318 srv.MetaClient = s.MetaClient 319 s.Services = append(s.Services, srv) 320 return nil 321} 322 323func (s *Server) appendGraphiteService(c graphite.Config) error { 324 if !c.Enabled { 325 return nil 326 } 327 srv, err := graphite.NewService(c) 328 if err != nil { 329 return err 330 } 331 332 srv.PointsWriter = s.PointsWriter 333 srv.MetaClient = s.MetaClient 334 srv.Monitor = s.Monitor 335 s.Services = append(s.Services, srv) 336 return nil 337} 338 339func (s *Server) appendPrecreatorService(c precreator.Config) error { 340 if !c.Enabled { 341 return nil 342 } 343 srv := precreator.NewService(c) 344 srv.MetaClient = s.MetaClient 345 s.Services = append(s.Services, srv) 346 return nil 347} 348 349func (s *Server) appendUDPService(c udp.Config) { 350 if !c.Enabled { 351 return 352 } 353 srv := udp.NewService(c) 354 srv.PointsWriter = s.PointsWriter 355 srv.MetaClient = s.MetaClient 356 s.Services = append(s.Services, srv) 357} 358 359func (s *Server) appendContinuousQueryService(c continuous_querier.Config) { 360 if !c.Enabled { 361 return 362 } 363 srv := continuous_querier.NewService(c) 364 srv.MetaClient = s.MetaClient 365 srv.QueryExecutor = s.QueryExecutor 366 srv.Monitor = s.Monitor 367 s.Services = append(s.Services, srv) 368} 369 370// Err returns an error channel that multiplexes all out of band errors received from all services. 371func (s *Server) Err() <-chan error { return s.err } 372 373// Open opens the meta and data store and all services. 374func (s *Server) Open() error { 375 // Start profiling, if set. 376 startProfile(s.CPUProfile, s.MemProfile) 377 378 // Open shared TCP connection. 379 ln, err := net.Listen("tcp", s.BindAddress) 380 if err != nil { 381 return fmt.Errorf("listen: %s", err) 382 } 383 s.Listener = ln 384 385 // Multiplex listener. 386 mux := tcp.NewMux() 387 go mux.Serve(ln) 388 389 // Append services. 390 s.appendMonitorService() 391 s.appendPrecreatorService(s.config.Precreator) 392 s.appendSnapshotterService() 393 s.appendContinuousQueryService(s.config.ContinuousQuery) 394 s.appendHTTPDService(s.config.HTTPD) 395 s.appendRetentionPolicyService(s.config.Retention) 396 for _, i := range s.config.GraphiteInputs { 397 if err := s.appendGraphiteService(i); err != nil { 398 return err 399 } 400 } 401 for _, i := range s.config.CollectdInputs { 402 s.appendCollectdService(i) 403 } 404 for _, i := range s.config.OpenTSDBInputs { 405 if err := s.appendOpenTSDBService(i); err != nil { 406 return err 407 } 408 } 409 for _, i := range s.config.UDPInputs { 410 s.appendUDPService(i) 411 } 412 413 s.Subscriber.MetaClient = s.MetaClient 414 s.PointsWriter.MetaClient = s.MetaClient 415 s.Monitor.MetaClient = s.MetaClient 416 417 s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader) 418 419 // Configure logging for all services and clients. 420 if s.config.Meta.LoggingEnabled { 421 s.MetaClient.WithLogger(s.Logger) 422 } 423 s.TSDBStore.WithLogger(s.Logger) 424 if s.config.Data.QueryLogEnabled { 425 s.QueryExecutor.WithLogger(s.Logger) 426 } 427 s.PointsWriter.WithLogger(s.Logger) 428 s.Subscriber.WithLogger(s.Logger) 429 for _, svc := range s.Services { 430 svc.WithLogger(s.Logger) 431 } 432 s.SnapshotterService.WithLogger(s.Logger) 433 s.Monitor.WithLogger(s.Logger) 434 435 // Open TSDB store. 436 if err := s.TSDBStore.Open(); err != nil { 437 return fmt.Errorf("open tsdb store: %s", err) 438 } 439 440 // Open the subscriber service 441 if err := s.Subscriber.Open(); err != nil { 442 return fmt.Errorf("open subscriber: %s", err) 443 } 444 445 // Open the points writer service 446 if err := s.PointsWriter.Open(); err != nil { 447 return fmt.Errorf("open points writer: %s", err) 448 } 449 450 s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points()) 451 452 for _, service := range s.Services { 453 if err := service.Open(); err != nil { 454 return fmt.Errorf("open service: %s", err) 455 } 456 } 457 458 // Start the reporting service, if not disabled. 459 if !s.reportingDisabled { 460 go s.startServerReporting() 461 } 462 463 return nil 464} 465 466// Close shuts down the meta and data stores and all services. 467func (s *Server) Close() error { 468 stopProfile() 469 470 // Close the listener first to stop any new connections 471 if s.Listener != nil { 472 s.Listener.Close() 473 } 474 475 // Close services to allow any inflight requests to complete 476 // and prevent new requests from being accepted. 477 for _, service := range s.Services { 478 service.Close() 479 } 480 481 s.config.deregisterDiagnostics(s.Monitor) 482 483 if s.PointsWriter != nil { 484 s.PointsWriter.Close() 485 } 486 487 if s.QueryExecutor != nil { 488 s.QueryExecutor.Close() 489 } 490 491 // Close the TSDBStore, no more reads or writes at this point 492 if s.TSDBStore != nil { 493 s.TSDBStore.Close() 494 } 495 496 if s.Subscriber != nil { 497 s.Subscriber.Close() 498 } 499 500 if s.MetaClient != nil { 501 s.MetaClient.Close() 502 } 503 504 close(s.closing) 505 return nil 506} 507 508// startServerReporting starts periodic server reporting. 509func (s *Server) startServerReporting() { 510 s.reportServer() 511 512 ticker := time.NewTicker(24 * time.Hour) 513 defer ticker.Stop() 514 for { 515 select { 516 case <-s.closing: 517 return 518 case <-ticker.C: 519 s.reportServer() 520 } 521 } 522} 523 524// reportServer reports usage statistics about the system. 525func (s *Server) reportServer() { 526 dbs := s.MetaClient.Databases() 527 numDatabases := len(dbs) 528 529 var ( 530 numMeasurements int64 531 numSeries int64 532 ) 533 534 for _, db := range dbs { 535 name := db.Name 536 n, err := s.TSDBStore.SeriesCardinality(name) 537 if err != nil { 538 s.Logger.Error(fmt.Sprintf("Unable to get series cardinality for database %s: %v", name, err)) 539 } else { 540 numSeries += n 541 } 542 543 n, err = s.TSDBStore.MeasurementsCardinality(name) 544 if err != nil { 545 s.Logger.Error(fmt.Sprintf("Unable to get measurement cardinality for database %s: %v", name, err)) 546 } else { 547 numMeasurements += n 548 } 549 } 550 551 clusterID := s.MetaClient.ClusterID() 552 cl := client.New("") 553 usage := client.Usage{ 554 Product: "influxdb", 555 Data: []client.UsageData{ 556 { 557 Values: client.Values{ 558 "os": runtime.GOOS, 559 "arch": runtime.GOARCH, 560 "version": s.buildInfo.Version, 561 "cluster_id": fmt.Sprintf("%v", clusterID), 562 "num_series": numSeries, 563 "num_measurements": numMeasurements, 564 "num_databases": numDatabases, 565 "uptime": time.Since(startTime).Seconds(), 566 }, 567 }, 568 }, 569 } 570 571 s.Logger.Info("Sending usage statistics to usage.influxdata.com") 572 573 go cl.Save(usage) 574} 575 576// Service represents a service attached to the server. 577type Service interface { 578 WithLogger(log *zap.Logger) 579 Open() error 580 Close() error 581} 582 583// prof stores the file locations of active profiles. 584var prof struct { 585 cpu *os.File 586 mem *os.File 587} 588 589// StartProfile initializes the cpu and memory profile, if specified. 590func startProfile(cpuprofile, memprofile string) { 591 if cpuprofile != "" { 592 f, err := os.Create(cpuprofile) 593 if err != nil { 594 log.Fatalf("cpuprofile: %v", err) 595 } 596 log.Printf("writing CPU profile to: %s\n", cpuprofile) 597 prof.cpu = f 598 pprof.StartCPUProfile(prof.cpu) 599 } 600 601 if memprofile != "" { 602 f, err := os.Create(memprofile) 603 if err != nil { 604 log.Fatalf("memprofile: %v", err) 605 } 606 log.Printf("writing mem profile to: %s\n", memprofile) 607 prof.mem = f 608 runtime.MemProfileRate = 4096 609 } 610 611} 612 613// StopProfile closes the cpu and memory profiles if they are running. 614func stopProfile() { 615 if prof.cpu != nil { 616 pprof.StopCPUProfile() 617 prof.cpu.Close() 618 log.Println("CPU profile stopped") 619 } 620 if prof.mem != nil { 621 pprof.Lookup("heap").WriteTo(prof.mem, 0) 622 prof.mem.Close() 623 log.Println("mem profile stopped") 624 } 625} 626 627// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps 628// to prevent a circular dependency between the `cluster` and `monitor` packages. 629type monitorPointsWriter coordinator.PointsWriter 630 631func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error { 632 return (*coordinator.PointsWriter)(pw).WritePointsPrivileged(database, retentionPolicy, models.ConsistencyLevelAny, points) 633} 634 635func raftDBExists(dir string) error { 636 // Check to see if there is a raft db, if so, error out with a message 637 // to downgrade, export, and then import the meta data 638 raftFile := filepath.Join(dir, "raft.db") 639 if _, err := os.Stat(raftFile); err == nil { 640 return fmt.Errorf("detected %s. To proceed, you'll need to either 1) downgrade to v0.11.x, export your metadata, upgrade to the current version again, and then import the metadata or 2) delete the file, which will effectively reset your database. For more assistance with the upgrade, see: https://docs.influxdata.com/influxdb/v0.12/administration/upgrading/", raftFile) 641 } 642 return nil 643} 644