1package run 2 3import ( 4 "context" 5 "crypto/tls" 6 "fmt" 7 "io" 8 "log" 9 "net" 10 "os" 11 "path/filepath" 12 "runtime" 13 "runtime/pprof" 14 "time" 15 16 "github.com/influxdata/flux" 17 "github.com/influxdata/flux/dependencies/testing" 18 "github.com/influxdata/influxdb" 19 "github.com/influxdata/influxdb/coordinator" 20 influxdb2 "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb" 21 "github.com/influxdata/influxdb/logger" 22 "github.com/influxdata/influxdb/models" 23 "github.com/influxdata/influxdb/monitor" 24 prometheus2 "github.com/influxdata/influxdb/prometheus" 25 "github.com/influxdata/influxdb/query" 26 "github.com/influxdata/influxdb/query/control" 27 "github.com/influxdata/influxdb/services/collectd" 28 "github.com/influxdata/influxdb/services/continuous_querier" 29 "github.com/influxdata/influxdb/services/graphite" 30 "github.com/influxdata/influxdb/services/httpd" 31 "github.com/influxdata/influxdb/services/meta" 32 "github.com/influxdata/influxdb/services/opentsdb" 33 "github.com/influxdata/influxdb/services/precreator" 34 "github.com/influxdata/influxdb/services/retention" 35 "github.com/influxdata/influxdb/services/snapshotter" 36 "github.com/influxdata/influxdb/services/storage" 37 "github.com/influxdata/influxdb/services/subscriber" 38 "github.com/influxdata/influxdb/services/udp" 39 reads "github.com/influxdata/influxdb/storage/flux" 40 "github.com/influxdata/influxdb/tcp" 41 "github.com/influxdata/influxdb/tsdb" 42 43 // Initialize the engine package 44 _ "github.com/influxdata/influxdb/tsdb/engine" 45 46 // Initialize the index package 47 _ "github.com/influxdata/influxdb/tsdb/index" 48 client "github.com/influxdata/usage-client/v1" 49 "github.com/prometheus/client_golang/prometheus" 50 "go.uber.org/zap" 51) 52 53var startTime time.Time 54 55func init() { 56 startTime = time.Now().UTC() 57} 58 59// BuildInfo represents the build details for the server code. 60type BuildInfo struct { 61 Version string 62 Commit string 63 Branch string 64 Time string 65} 66 67// Server represents a container for the metadata and storage data and services. 68// It is built using a Config and it manages the startup and shutdown of all 69// services in the proper order. 70type Server struct { 71 buildInfo BuildInfo 72 73 err chan error 74 closing chan struct{} 75 76 BindAddress string 77 Listener net.Listener 78 79 Logger *zap.Logger 80 MuxLogger *log.Logger 81 82 MetaClient *meta.Client 83 84 TSDBStore *tsdb.Store 85 QueryExecutor *query.Executor 86 PointsWriter *coordinator.PointsWriter 87 Subscriber *subscriber.Service 88 89 Services []Service 90 91 Prometheus *prometheus.Registry 92 93 // These references are required for the tcp muxer. 94 SnapshotterService *snapshotter.Service 95 96 Monitor *monitor.Monitor 97 98 // Server reporting and registration 99 reportingDisabled bool 100 101 // Profiling 102 CPUProfile string 103 CPUProfileWriteCloser io.WriteCloser 104 MemProfile string 105 MemProfileWriteCloser io.WriteCloser 106 107 // httpAPIAddr is the host:port combination for the main HTTP API for querying and writing data 108 httpAPIAddr string 109 110 // httpUseTLS specifies if we should use a TLS connection to the http servers 111 httpUseTLS bool 112 113 // tcpAddr is the host:port combination for the TCP listener that services mux onto 114 tcpAddr string 115 116 config *Config 117} 118 119// updateTLSConfig stores with into the tls config pointed at by into but only if with is not nil 120// and into is nil. Think of it as setting the default value. 121func updateTLSConfig(into **tls.Config, with *tls.Config) { 122 if with != nil && into != nil && *into == nil { 123 *into = with 124 } 125} 126 127// NewServer returns a new instance of Server built from a config. 128func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { 129 // First grab the base tls config we will use for all clients and servers 130 tlsConfig, err := c.TLS.Parse() 131 if err != nil { 132 return nil, fmt.Errorf("tls configuration: %v", err) 133 } 134 135 // Update the TLS values on each of the configs to be the parsed one if 136 // not already specified (set the default). 137 updateTLSConfig(&c.HTTPD.TLS, tlsConfig) 138 updateTLSConfig(&c.Subscriber.TLS, tlsConfig) 139 for i := range c.OpenTSDBInputs { 140 updateTLSConfig(&c.OpenTSDBInputs[i].TLS, tlsConfig) 141 } 142 143 // We need to ensure that a meta directory always exists even if 144 // we don't start the meta store. node.json is always stored under 145 // the meta directory. 146 if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil { 147 return nil, fmt.Errorf("mkdir all: %s", err) 148 } 149 150 // 0.10-rc1 and prior would sometimes put the node.json at the root 151 // dir which breaks backup/restore and restarting nodes. This moves 152 // the file from the root so it's always under the meta dir. 153 oldPath := filepath.Join(filepath.Dir(c.Meta.Dir), "node.json") 154 newPath := filepath.Join(c.Meta.Dir, "node.json") 155 156 if _, err := os.Stat(oldPath); err == nil { 157 if err := os.Rename(oldPath, newPath); err != nil { 158 return nil, err 159 } 160 } 161 162 _, err = influxdb.LoadNode(c.Meta.Dir) 163 if err != nil { 164 if !os.IsNotExist(err) { 165 return nil, err 166 } 167 } 168 169 if err := raftDBExists(c.Meta.Dir); err != nil { 170 return nil, err 171 } 172 173 // In 0.10.0 bind-address got moved to the top level. Check 174 // The old location to keep things backwards compatible 175 bind := c.BindAddress 176 177 s := &Server{ 178 buildInfo: *buildInfo, 179 err: make(chan error), 180 closing: make(chan struct{}), 181 182 BindAddress: bind, 183 Prometheus: prometheus.NewRegistry(), 184 185 Logger: logger.New(os.Stderr), 186 MuxLogger: tcp.MuxLogger(os.Stderr), 187 188 MetaClient: meta.NewClient(c.Meta), 189 190 reportingDisabled: c.ReportingDisabled, 191 192 httpAPIAddr: c.HTTPD.BindAddress, 193 httpUseTLS: c.HTTPD.HTTPSEnabled, 194 tcpAddr: bind, 195 196 config: c, 197 } 198 s.Monitor = monitor.New(s, c.Monitor) 199 s.config.registerDiagnostics(s.Monitor) 200 201 if err := s.MetaClient.Open(); err != nil { 202 return nil, err 203 } 204 205 s.TSDBStore = tsdb.NewStore(c.Data.Dir) 206 s.TSDBStore.EngineOptions.Config = c.Data 207 208 // Copy TSDB configuration. 209 s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine 210 s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index 211 212 // Create the Subscriber service 213 s.Subscriber = subscriber.NewService(c.Subscriber) 214 215 // Initialize points writer. 216 s.PointsWriter = coordinator.NewPointsWriter() 217 s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout) 218 s.PointsWriter.TSDBStore = s.TSDBStore 219 220 // Initialize query executor. 221 s.QueryExecutor = query.NewExecutor() 222 s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{ 223 MetaClient: s.MetaClient, 224 TaskManager: s.QueryExecutor.TaskManager, 225 TSDBStore: s.TSDBStore, 226 ShardMapper: &coordinator.LocalShardMapper{ 227 MetaClient: s.MetaClient, 228 TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore}, 229 }, 230 StrictErrorHandling: s.TSDBStore.EngineOptions.Config.StrictErrorHandling, 231 Monitor: s.Monitor, 232 PointsWriter: s.PointsWriter, 233 MaxSelectPointN: c.Coordinator.MaxSelectPointN, 234 MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN, 235 MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN, 236 } 237 s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout) 238 s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter) 239 s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries 240 241 // Initialize the monitor 242 s.Monitor.Version = s.buildInfo.Version 243 s.Monitor.Commit = s.buildInfo.Commit 244 s.Monitor.Branch = s.buildInfo.Branch 245 s.Monitor.BuildTime = s.buildInfo.Time 246 s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter) 247 return s, nil 248} 249 250// Statistics returns statistics for the services running in the Server. 251func (s *Server) Statistics(tags map[string]string) []models.Statistic { 252 var statistics []models.Statistic 253 statistics = append(statistics, s.QueryExecutor.Statistics(tags)...) 254 statistics = append(statistics, s.TSDBStore.Statistics(tags)...) 255 statistics = append(statistics, s.PointsWriter.Statistics(tags)...) 256 statistics = append(statistics, s.Subscriber.Statistics(tags)...) 257 for _, srv := range s.Services { 258 if m, ok := srv.(monitor.Reporter); ok { 259 statistics = append(statistics, m.Statistics(tags)...) 260 } 261 } 262 metricFamily, err := s.Prometheus.Gather() 263 if err == nil { 264 statistics = append(statistics, prometheus2.PrometheusToStatistics(metricFamily, tags)...) 265 } 266 return statistics 267} 268 269func (s *Server) appendSnapshotterService() { 270 srv := snapshotter.NewService() 271 srv.TSDBStore = s.TSDBStore 272 srv.MetaClient = s.MetaClient 273 s.Services = append(s.Services, srv) 274 s.SnapshotterService = srv 275} 276 277// SetLogOutput sets the logger used for all messages. It must not be called 278// after the Open method has been called. 279func (s *Server) SetLogOutput(w io.Writer) { 280 s.Logger = logger.New(w) 281 s.MuxLogger = tcp.MuxLogger(w) 282} 283 284func (s *Server) appendMonitorService() { 285 s.Services = append(s.Services, s.Monitor) 286} 287 288func (s *Server) appendRetentionPolicyService(c retention.Config) { 289 if !c.Enabled { 290 return 291 } 292 srv := retention.NewService(c) 293 srv.MetaClient = s.MetaClient 294 srv.TSDBStore = s.TSDBStore 295 s.Services = append(s.Services, srv) 296} 297 298func (s *Server) appendHTTPDService(c httpd.Config) error { 299 if !c.Enabled { 300 return nil 301 } 302 srv := httpd.NewService(c) 303 srv.Handler.MetaClient = s.MetaClient 304 authorizer := meta.NewQueryAuthorizer(s.MetaClient) 305 srv.Handler.QueryAuthorizer = authorizer 306 srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient) 307 srv.Handler.QueryExecutor = s.QueryExecutor 308 srv.Handler.Monitor = s.Monitor 309 srv.Handler.PointsWriter = s.PointsWriter 310 srv.Handler.Version = s.buildInfo.Version 311 srv.Handler.BuildType = "OSS" 312 ss := storage.NewStore(s.TSDBStore, s.MetaClient) 313 srv.Handler.Store = ss 314 if s.config.HTTPD.FluxEnabled { 315 storageDep, err := influxdb2.NewDependencies(s.MetaClient, reads.NewReader(ss), authorizer, c.AuthEnabled, s.PointsWriter) 316 if err != nil { 317 return err 318 } 319 srv.Handler.Controller, err = control.New( 320 s.config.FluxController, 321 s.Logger.With(zap.String("service", "flux-controller")), 322 []flux.Dependency{storageDep, testing.FrameworkConfig{}}, 323 ) 324 if err != nil { 325 return err 326 } 327 s.Prometheus.MustRegister(srv.Handler.Controller.PrometheusCollectors()...) 328 } 329 330 s.Services = append(s.Services, srv) 331 return nil 332} 333 334func (s *Server) appendCollectdService(c collectd.Config) { 335 if !c.Enabled { 336 return 337 } 338 srv := collectd.NewService(c) 339 srv.MetaClient = s.MetaClient 340 srv.PointsWriter = s.PointsWriter 341 s.Services = append(s.Services, srv) 342} 343 344func (s *Server) appendOpenTSDBService(c opentsdb.Config) error { 345 if !c.Enabled { 346 return nil 347 } 348 srv, err := opentsdb.NewService(c) 349 if err != nil { 350 return err 351 } 352 srv.PointsWriter = s.PointsWriter 353 srv.MetaClient = s.MetaClient 354 s.Services = append(s.Services, srv) 355 return nil 356} 357 358func (s *Server) appendGraphiteService(c graphite.Config) error { 359 if !c.Enabled { 360 return nil 361 } 362 srv, err := graphite.NewService(c) 363 if err != nil { 364 return err 365 } 366 367 srv.PointsWriter = s.PointsWriter 368 srv.MetaClient = s.MetaClient 369 srv.Monitor = s.Monitor 370 s.Services = append(s.Services, srv) 371 return nil 372} 373 374func (s *Server) appendPrecreatorService(c precreator.Config) error { 375 if !c.Enabled { 376 return nil 377 } 378 srv := precreator.NewService(c) 379 srv.MetaClient = s.MetaClient 380 s.Services = append(s.Services, srv) 381 return nil 382} 383 384func (s *Server) appendUDPService(c udp.Config) { 385 if !c.Enabled { 386 return 387 } 388 srv := udp.NewService(c) 389 srv.PointsWriter = s.PointsWriter 390 srv.MetaClient = s.MetaClient 391 s.Services = append(s.Services, srv) 392} 393 394func (s *Server) appendContinuousQueryService(c continuous_querier.Config) { 395 if !c.Enabled { 396 return 397 } 398 srv := continuous_querier.NewService(c) 399 srv.MetaClient = s.MetaClient 400 srv.QueryExecutor = s.QueryExecutor 401 srv.Monitor = s.Monitor 402 s.Services = append(s.Services, srv) 403} 404 405// Err returns an error channel that multiplexes all out of band errors received from all services. 406func (s *Server) Err() <-chan error { return s.err } 407 408// Open opens the meta and data store and all services. 409func (s *Server) Open() error { 410 // Start profiling if requested. 411 if err := s.startProfile(); err != nil { 412 return err 413 } 414 415 // Open shared TCP connection. 416 ln, err := net.Listen("tcp", s.BindAddress) 417 if err != nil { 418 return fmt.Errorf("listen: %s", err) 419 } 420 s.Listener = ln 421 422 // Multiplex listener. 423 mux := tcp.NewMux(s.MuxLogger) 424 go mux.Serve(ln) 425 426 // Append services. 427 s.appendMonitorService() 428 s.appendPrecreatorService(s.config.Precreator) 429 s.appendSnapshotterService() 430 s.appendContinuousQueryService(s.config.ContinuousQuery) 431 if err := s.appendHTTPDService(s.config.HTTPD); err != nil { 432 return err 433 } 434 s.appendRetentionPolicyService(s.config.Retention) 435 for _, i := range s.config.GraphiteInputs { 436 if err := s.appendGraphiteService(i); err != nil { 437 return err 438 } 439 } 440 for _, i := range s.config.CollectdInputs { 441 s.appendCollectdService(i) 442 } 443 for _, i := range s.config.OpenTSDBInputs { 444 if err := s.appendOpenTSDBService(i); err != nil { 445 return err 446 } 447 } 448 for _, i := range s.config.UDPInputs { 449 s.appendUDPService(i) 450 } 451 452 s.Subscriber.MetaClient = s.MetaClient 453 s.PointsWriter.MetaClient = s.MetaClient 454 s.Monitor.MetaClient = s.MetaClient 455 456 s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader) 457 458 // Configure logging for all services and clients. 459 if s.config.Meta.LoggingEnabled { 460 s.MetaClient.WithLogger(s.Logger) 461 } 462 s.TSDBStore.WithLogger(s.Logger) 463 if s.config.Data.QueryLogEnabled { 464 s.QueryExecutor.WithLogger(s.Logger) 465 } 466 s.PointsWriter.WithLogger(s.Logger) 467 s.Subscriber.WithLogger(s.Logger) 468 for _, svc := range s.Services { 469 svc.WithLogger(s.Logger) 470 } 471 s.SnapshotterService.WithLogger(s.Logger) 472 s.Monitor.WithLogger(s.Logger) 473 474 // Open TSDB store. 475 if err := s.TSDBStore.Open(); err != nil { 476 return fmt.Errorf("open tsdb store: %s", err) 477 } 478 479 // Add the subscriber before opening the PointsWriter 480 s.PointsWriter.Subscriber = s.Subscriber 481 482 // Open the subscriber service 483 if err := s.Subscriber.Open(); err != nil { 484 return fmt.Errorf("open subscriber: %s", err) 485 } 486 487 // Open the points writer service 488 if err := s.PointsWriter.Open(); err != nil { 489 return fmt.Errorf("open points writer: %s", err) 490 } 491 492 for _, service := range s.Services { 493 if err := service.Open(); err != nil { 494 return fmt.Errorf("open service: %s", err) 495 } 496 } 497 498 // Start the reporting service, if not disabled. 499 if !s.reportingDisabled { 500 go s.startServerReporting() 501 } 502 503 return nil 504} 505 506// Close shuts down the meta and data stores and all services. 507func (s *Server) Close() error { 508 s.stopProfile() 509 510 // Close the listener first to stop any new connections 511 if s.Listener != nil { 512 s.Listener.Close() 513 } 514 515 // Close services to allow any inflight requests to complete 516 // and prevent new requests from being accepted. 517 for _, service := range s.Services { 518 service.Close() 519 } 520 521 s.config.deregisterDiagnostics(s.Monitor) 522 523 if s.PointsWriter != nil { 524 s.PointsWriter.Close() 525 } 526 527 if s.QueryExecutor != nil { 528 s.QueryExecutor.Close() 529 } 530 531 // Close the TSDBStore, no more reads or writes at this point 532 if s.TSDBStore != nil { 533 s.TSDBStore.Close() 534 } 535 536 if s.Subscriber != nil { 537 s.Subscriber.Close() 538 } 539 540 if s.MetaClient != nil { 541 s.MetaClient.Close() 542 } 543 544 close(s.closing) 545 return nil 546} 547 548// startServerReporting starts periodic server reporting. 549func (s *Server) startServerReporting() { 550 s.reportServer() 551 552 ticker := time.NewTicker(24 * time.Hour) 553 defer ticker.Stop() 554 for { 555 select { 556 case <-s.closing: 557 return 558 case <-ticker.C: 559 s.reportServer() 560 } 561 } 562} 563 564// reportServer reports usage statistics about the system. 565func (s *Server) reportServer() { 566 dbs := s.MetaClient.Databases() 567 numDatabases := len(dbs) 568 569 var ( 570 numMeasurements int64 571 numSeries int64 572 ) 573 574 for _, db := range dbs { 575 name := db.Name 576 // Use the context.Background() to avoid timing out on this. 577 n, err := s.TSDBStore.SeriesCardinality(context.Background(), name) 578 if err != nil { 579 s.Logger.Error(fmt.Sprintf("Unable to get series cardinality for database %s: %v", name, err)) 580 } else { 581 numSeries += n 582 } 583 584 // Use the context.Background() to avoid timing out on this. 585 n, err = s.TSDBStore.MeasurementsCardinality(context.Background(), name) 586 if err != nil { 587 s.Logger.Error(fmt.Sprintf("Unable to get measurement cardinality for database %s: %v", name, err)) 588 } else { 589 numMeasurements += n 590 } 591 } 592 593 clusterID := s.MetaClient.ClusterID() 594 cl := client.New("") 595 usage := client.Usage{ 596 Product: "influxdb", 597 Data: []client.UsageData{ 598 { 599 Values: client.Values{ 600 "os": runtime.GOOS, 601 "arch": runtime.GOARCH, 602 "version": s.buildInfo.Version, 603 "cluster_id": fmt.Sprintf("%v", clusterID), 604 "num_series": numSeries, 605 "num_measurements": numMeasurements, 606 "num_databases": numDatabases, 607 "uptime": time.Since(startTime).Seconds(), 608 }, 609 }, 610 }, 611 } 612 613 s.Logger.Info("Sending usage statistics to usage.influxdata.com") 614 615 go cl.Save(usage) 616} 617 618// Service represents a service attached to the server. 619type Service interface { 620 WithLogger(log *zap.Logger) 621 Open() error 622 Close() error 623} 624 625// prof stores the file locations of active profiles. 626// StartProfile initializes the cpu and memory profile, if specified. 627func (s *Server) startProfile() error { 628 if s.CPUProfile != "" { 629 f, err := os.Create(s.CPUProfile) 630 if err != nil { 631 return fmt.Errorf("cpuprofile: %v", err) 632 } 633 634 s.CPUProfileWriteCloser = f 635 if err := pprof.StartCPUProfile(s.CPUProfileWriteCloser); err != nil { 636 return err 637 } 638 639 log.Printf("Writing CPU profile to: %s\n", s.CPUProfile) 640 } 641 642 if s.MemProfile != "" { 643 f, err := os.Create(s.MemProfile) 644 if err != nil { 645 return fmt.Errorf("memprofile: %v", err) 646 } 647 648 s.MemProfileWriteCloser = f 649 runtime.MemProfileRate = 4096 650 651 log.Printf("Writing mem profile to: %s\n", s.MemProfile) 652 } 653 654 return nil 655} 656 657// StopProfile closes the cpu and memory profiles if they are running. 658func (s *Server) stopProfile() error { 659 if s.CPUProfileWriteCloser != nil { 660 pprof.StopCPUProfile() 661 if err := s.CPUProfileWriteCloser.Close(); err != nil { 662 return err 663 } 664 log.Println("CPU profile stopped") 665 } 666 667 if s.MemProfileWriteCloser != nil { 668 pprof.Lookup("heap").WriteTo(s.MemProfileWriteCloser, 0) 669 if err := s.MemProfileWriteCloser.Close(); err != nil { 670 return err 671 } 672 log.Println("mem profile stopped") 673 } 674 675 return nil 676} 677 678// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps 679// to prevent a circular dependency between the `cluster` and `monitor` packages. 680type monitorPointsWriter coordinator.PointsWriter 681 682func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error { 683 writeCtx := tsdb.WriteContext{ 684 UserId: tsdb.MonitorUser, 685 } 686 return (*coordinator.PointsWriter)(pw).WritePointsPrivileged(writeCtx, database, retentionPolicy, models.ConsistencyLevelAny, points) 687} 688 689func raftDBExists(dir string) error { 690 // Check to see if there is a raft db, if so, error out with a message 691 // to downgrade, export, and then import the meta data 692 raftFile := filepath.Join(dir, "raft.db") 693 if _, err := os.Stat(raftFile); err == nil { 694 return fmt.Errorf("detected %s. To proceed, you'll need to either 1) downgrade to v0.11.x, export your metadata, upgrade to the current version again, and then import the metadata or 2) delete the file, which will effectively reset your database. For more assistance with the upgrade, see: https://docs.influxdata.com/influxdb/v0.12/administration/upgrading/", raftFile) 695 } 696 return nil 697} 698