1package run
2
3import (
4	"context"
5	"crypto/tls"
6	"fmt"
7	"io"
8	"log"
9	"net"
10	"os"
11	"path/filepath"
12	"runtime"
13	"runtime/pprof"
14	"time"
15
16	"github.com/influxdata/flux"
17	"github.com/influxdata/flux/dependencies/testing"
18	"github.com/influxdata/influxdb"
19	"github.com/influxdata/influxdb/coordinator"
20	influxdb2 "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
21	"github.com/influxdata/influxdb/logger"
22	"github.com/influxdata/influxdb/models"
23	"github.com/influxdata/influxdb/monitor"
24	prometheus2 "github.com/influxdata/influxdb/prometheus"
25	"github.com/influxdata/influxdb/query"
26	"github.com/influxdata/influxdb/query/control"
27	"github.com/influxdata/influxdb/services/collectd"
28	"github.com/influxdata/influxdb/services/continuous_querier"
29	"github.com/influxdata/influxdb/services/graphite"
30	"github.com/influxdata/influxdb/services/httpd"
31	"github.com/influxdata/influxdb/services/meta"
32	"github.com/influxdata/influxdb/services/opentsdb"
33	"github.com/influxdata/influxdb/services/precreator"
34	"github.com/influxdata/influxdb/services/retention"
35	"github.com/influxdata/influxdb/services/snapshotter"
36	"github.com/influxdata/influxdb/services/storage"
37	"github.com/influxdata/influxdb/services/subscriber"
38	"github.com/influxdata/influxdb/services/udp"
39	reads "github.com/influxdata/influxdb/storage/flux"
40	"github.com/influxdata/influxdb/tcp"
41	"github.com/influxdata/influxdb/tsdb"
42
43	// Initialize the engine package
44	_ "github.com/influxdata/influxdb/tsdb/engine"
45
46	// Initialize the index package
47	_ "github.com/influxdata/influxdb/tsdb/index"
48	client "github.com/influxdata/usage-client/v1"
49	"github.com/prometheus/client_golang/prometheus"
50	"go.uber.org/zap"
51)
52
53var startTime time.Time
54
55func init() {
56	startTime = time.Now().UTC()
57}
58
59// BuildInfo represents the build details for the server code.
60type BuildInfo struct {
61	Version string
62	Commit  string
63	Branch  string
64	Time    string
65}
66
67// Server represents a container for the metadata and storage data and services.
68// It is built using a Config and it manages the startup and shutdown of all
69// services in the proper order.
70type Server struct {
71	buildInfo BuildInfo
72
73	err     chan error
74	closing chan struct{}
75
76	BindAddress string
77	Listener    net.Listener
78
79	Logger    *zap.Logger
80	MuxLogger *log.Logger
81
82	MetaClient *meta.Client
83
84	TSDBStore     *tsdb.Store
85	QueryExecutor *query.Executor
86	PointsWriter  *coordinator.PointsWriter
87	Subscriber    *subscriber.Service
88
89	Services []Service
90
91	Prometheus *prometheus.Registry
92
93	// These references are required for the tcp muxer.
94	SnapshotterService *snapshotter.Service
95
96	Monitor *monitor.Monitor
97
98	// Server reporting and registration
99	reportingDisabled bool
100
101	// Profiling
102	CPUProfile            string
103	CPUProfileWriteCloser io.WriteCloser
104	MemProfile            string
105	MemProfileWriteCloser io.WriteCloser
106
107	// httpAPIAddr is the host:port combination for the main HTTP API for querying and writing data
108	httpAPIAddr string
109
110	// httpUseTLS specifies if we should use a TLS connection to the http servers
111	httpUseTLS bool
112
113	// tcpAddr is the host:port combination for the TCP listener that services mux onto
114	tcpAddr string
115
116	config *Config
117}
118
119// updateTLSConfig stores with into the tls config pointed at by into but only if with is not nil
120// and into is nil. Think of it as setting the default value.
121func updateTLSConfig(into **tls.Config, with *tls.Config) {
122	if with != nil && into != nil && *into == nil {
123		*into = with
124	}
125}
126
127// NewServer returns a new instance of Server built from a config.
128func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
129	// First grab the base tls config we will use for all clients and servers
130	tlsConfig, err := c.TLS.Parse()
131	if err != nil {
132		return nil, fmt.Errorf("tls configuration: %v", err)
133	}
134
135	// Update the TLS values on each of the configs to be the parsed one if
136	// not already specified (set the default).
137	updateTLSConfig(&c.HTTPD.TLS, tlsConfig)
138	updateTLSConfig(&c.Subscriber.TLS, tlsConfig)
139	for i := range c.OpenTSDBInputs {
140		updateTLSConfig(&c.OpenTSDBInputs[i].TLS, tlsConfig)
141	}
142
143	// We need to ensure that a meta directory always exists even if
144	// we don't start the meta store.  node.json is always stored under
145	// the meta directory.
146	if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil {
147		return nil, fmt.Errorf("mkdir all: %s", err)
148	}
149
150	// 0.10-rc1 and prior would sometimes put the node.json at the root
151	// dir which breaks backup/restore and restarting nodes.  This moves
152	// the file from the root so it's always under the meta dir.
153	oldPath := filepath.Join(filepath.Dir(c.Meta.Dir), "node.json")
154	newPath := filepath.Join(c.Meta.Dir, "node.json")
155
156	if _, err := os.Stat(oldPath); err == nil {
157		if err := os.Rename(oldPath, newPath); err != nil {
158			return nil, err
159		}
160	}
161
162	_, err = influxdb.LoadNode(c.Meta.Dir)
163	if err != nil {
164		if !os.IsNotExist(err) {
165			return nil, err
166		}
167	}
168
169	if err := raftDBExists(c.Meta.Dir); err != nil {
170		return nil, err
171	}
172
173	// In 0.10.0 bind-address got moved to the top level. Check
174	// The old location to keep things backwards compatible
175	bind := c.BindAddress
176
177	s := &Server{
178		buildInfo: *buildInfo,
179		err:       make(chan error),
180		closing:   make(chan struct{}),
181
182		BindAddress: bind,
183		Prometheus:  prometheus.NewRegistry(),
184
185		Logger:    logger.New(os.Stderr),
186		MuxLogger: tcp.MuxLogger(os.Stderr),
187
188		MetaClient: meta.NewClient(c.Meta),
189
190		reportingDisabled: c.ReportingDisabled,
191
192		httpAPIAddr: c.HTTPD.BindAddress,
193		httpUseTLS:  c.HTTPD.HTTPSEnabled,
194		tcpAddr:     bind,
195
196		config: c,
197	}
198	s.Monitor = monitor.New(s, c.Monitor)
199	s.config.registerDiagnostics(s.Monitor)
200
201	if err := s.MetaClient.Open(); err != nil {
202		return nil, err
203	}
204
205	s.TSDBStore = tsdb.NewStore(c.Data.Dir)
206	s.TSDBStore.EngineOptions.Config = c.Data
207
208	// Copy TSDB configuration.
209	s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
210	s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index
211
212	// Create the Subscriber service
213	s.Subscriber = subscriber.NewService(c.Subscriber)
214
215	// Initialize points writer.
216	s.PointsWriter = coordinator.NewPointsWriter()
217	s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
218	s.PointsWriter.TSDBStore = s.TSDBStore
219
220	// Initialize query executor.
221	s.QueryExecutor = query.NewExecutor()
222	s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
223		MetaClient:  s.MetaClient,
224		TaskManager: s.QueryExecutor.TaskManager,
225		TSDBStore:   s.TSDBStore,
226		ShardMapper: &coordinator.LocalShardMapper{
227			MetaClient: s.MetaClient,
228			TSDBStore:  coordinator.LocalTSDBStore{Store: s.TSDBStore},
229		},
230		StrictErrorHandling: s.TSDBStore.EngineOptions.Config.StrictErrorHandling,
231		Monitor:             s.Monitor,
232		PointsWriter:        s.PointsWriter,
233		MaxSelectPointN:     c.Coordinator.MaxSelectPointN,
234		MaxSelectSeriesN:    c.Coordinator.MaxSelectSeriesN,
235		MaxSelectBucketsN:   c.Coordinator.MaxSelectBucketsN,
236	}
237	s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
238	s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
239	s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries
240
241	// Initialize the monitor
242	s.Monitor.Version = s.buildInfo.Version
243	s.Monitor.Commit = s.buildInfo.Commit
244	s.Monitor.Branch = s.buildInfo.Branch
245	s.Monitor.BuildTime = s.buildInfo.Time
246	s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)
247	return s, nil
248}
249
250// Statistics returns statistics for the services running in the Server.
251func (s *Server) Statistics(tags map[string]string) []models.Statistic {
252	var statistics []models.Statistic
253	statistics = append(statistics, s.QueryExecutor.Statistics(tags)...)
254	statistics = append(statistics, s.TSDBStore.Statistics(tags)...)
255	statistics = append(statistics, s.PointsWriter.Statistics(tags)...)
256	statistics = append(statistics, s.Subscriber.Statistics(tags)...)
257	for _, srv := range s.Services {
258		if m, ok := srv.(monitor.Reporter); ok {
259			statistics = append(statistics, m.Statistics(tags)...)
260		}
261	}
262	metricFamily, err := s.Prometheus.Gather()
263	if err == nil {
264		statistics = append(statistics, prometheus2.PrometheusToStatistics(metricFamily, tags)...)
265	}
266	return statistics
267}
268
269func (s *Server) appendSnapshotterService() {
270	srv := snapshotter.NewService()
271	srv.TSDBStore = s.TSDBStore
272	srv.MetaClient = s.MetaClient
273	s.Services = append(s.Services, srv)
274	s.SnapshotterService = srv
275}
276
277// SetLogOutput sets the logger used for all messages. It must not be called
278// after the Open method has been called.
279func (s *Server) SetLogOutput(w io.Writer) {
280	s.Logger = logger.New(w)
281	s.MuxLogger = tcp.MuxLogger(w)
282}
283
284func (s *Server) appendMonitorService() {
285	s.Services = append(s.Services, s.Monitor)
286}
287
288func (s *Server) appendRetentionPolicyService(c retention.Config) {
289	if !c.Enabled {
290		return
291	}
292	srv := retention.NewService(c)
293	srv.MetaClient = s.MetaClient
294	srv.TSDBStore = s.TSDBStore
295	s.Services = append(s.Services, srv)
296}
297
298func (s *Server) appendHTTPDService(c httpd.Config) error {
299	if !c.Enabled {
300		return nil
301	}
302	srv := httpd.NewService(c)
303	srv.Handler.MetaClient = s.MetaClient
304	authorizer := meta.NewQueryAuthorizer(s.MetaClient)
305	srv.Handler.QueryAuthorizer = authorizer
306	srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient)
307	srv.Handler.QueryExecutor = s.QueryExecutor
308	srv.Handler.Monitor = s.Monitor
309	srv.Handler.PointsWriter = s.PointsWriter
310	srv.Handler.Version = s.buildInfo.Version
311	srv.Handler.BuildType = "OSS"
312	ss := storage.NewStore(s.TSDBStore, s.MetaClient)
313	srv.Handler.Store = ss
314	if s.config.HTTPD.FluxEnabled {
315		storageDep, err := influxdb2.NewDependencies(s.MetaClient, reads.NewReader(ss), authorizer, c.AuthEnabled, s.PointsWriter)
316		if err != nil {
317			return err
318		}
319		srv.Handler.Controller, err = control.New(
320			s.config.FluxController,
321			s.Logger.With(zap.String("service", "flux-controller")),
322			[]flux.Dependency{storageDep, testing.FrameworkConfig{}},
323		)
324		if err != nil {
325			return err
326		}
327		s.Prometheus.MustRegister(srv.Handler.Controller.PrometheusCollectors()...)
328	}
329
330	s.Services = append(s.Services, srv)
331	return nil
332}
333
334func (s *Server) appendCollectdService(c collectd.Config) {
335	if !c.Enabled {
336		return
337	}
338	srv := collectd.NewService(c)
339	srv.MetaClient = s.MetaClient
340	srv.PointsWriter = s.PointsWriter
341	s.Services = append(s.Services, srv)
342}
343
344func (s *Server) appendOpenTSDBService(c opentsdb.Config) error {
345	if !c.Enabled {
346		return nil
347	}
348	srv, err := opentsdb.NewService(c)
349	if err != nil {
350		return err
351	}
352	srv.PointsWriter = s.PointsWriter
353	srv.MetaClient = s.MetaClient
354	s.Services = append(s.Services, srv)
355	return nil
356}
357
358func (s *Server) appendGraphiteService(c graphite.Config) error {
359	if !c.Enabled {
360		return nil
361	}
362	srv, err := graphite.NewService(c)
363	if err != nil {
364		return err
365	}
366
367	srv.PointsWriter = s.PointsWriter
368	srv.MetaClient = s.MetaClient
369	srv.Monitor = s.Monitor
370	s.Services = append(s.Services, srv)
371	return nil
372}
373
374func (s *Server) appendPrecreatorService(c precreator.Config) error {
375	if !c.Enabled {
376		return nil
377	}
378	srv := precreator.NewService(c)
379	srv.MetaClient = s.MetaClient
380	s.Services = append(s.Services, srv)
381	return nil
382}
383
384func (s *Server) appendUDPService(c udp.Config) {
385	if !c.Enabled {
386		return
387	}
388	srv := udp.NewService(c)
389	srv.PointsWriter = s.PointsWriter
390	srv.MetaClient = s.MetaClient
391	s.Services = append(s.Services, srv)
392}
393
394func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
395	if !c.Enabled {
396		return
397	}
398	srv := continuous_querier.NewService(c)
399	srv.MetaClient = s.MetaClient
400	srv.QueryExecutor = s.QueryExecutor
401	srv.Monitor = s.Monitor
402	s.Services = append(s.Services, srv)
403}
404
405// Err returns an error channel that multiplexes all out of band errors received from all services.
406func (s *Server) Err() <-chan error { return s.err }
407
408// Open opens the meta and data store and all services.
409func (s *Server) Open() error {
410	// Start profiling if requested.
411	if err := s.startProfile(); err != nil {
412		return err
413	}
414
415	// Open shared TCP connection.
416	ln, err := net.Listen("tcp", s.BindAddress)
417	if err != nil {
418		return fmt.Errorf("listen: %s", err)
419	}
420	s.Listener = ln
421
422	// Multiplex listener.
423	mux := tcp.NewMux(s.MuxLogger)
424	go mux.Serve(ln)
425
426	// Append services.
427	s.appendMonitorService()
428	s.appendPrecreatorService(s.config.Precreator)
429	s.appendSnapshotterService()
430	s.appendContinuousQueryService(s.config.ContinuousQuery)
431	if err := s.appendHTTPDService(s.config.HTTPD); err != nil {
432		return err
433	}
434	s.appendRetentionPolicyService(s.config.Retention)
435	for _, i := range s.config.GraphiteInputs {
436		if err := s.appendGraphiteService(i); err != nil {
437			return err
438		}
439	}
440	for _, i := range s.config.CollectdInputs {
441		s.appendCollectdService(i)
442	}
443	for _, i := range s.config.OpenTSDBInputs {
444		if err := s.appendOpenTSDBService(i); err != nil {
445			return err
446		}
447	}
448	for _, i := range s.config.UDPInputs {
449		s.appendUDPService(i)
450	}
451
452	s.Subscriber.MetaClient = s.MetaClient
453	s.PointsWriter.MetaClient = s.MetaClient
454	s.Monitor.MetaClient = s.MetaClient
455
456	s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
457
458	// Configure logging for all services and clients.
459	if s.config.Meta.LoggingEnabled {
460		s.MetaClient.WithLogger(s.Logger)
461	}
462	s.TSDBStore.WithLogger(s.Logger)
463	if s.config.Data.QueryLogEnabled {
464		s.QueryExecutor.WithLogger(s.Logger)
465	}
466	s.PointsWriter.WithLogger(s.Logger)
467	s.Subscriber.WithLogger(s.Logger)
468	for _, svc := range s.Services {
469		svc.WithLogger(s.Logger)
470	}
471	s.SnapshotterService.WithLogger(s.Logger)
472	s.Monitor.WithLogger(s.Logger)
473
474	// Open TSDB store.
475	if err := s.TSDBStore.Open(); err != nil {
476		return fmt.Errorf("open tsdb store: %s", err)
477	}
478
479	// Add the subscriber before opening the PointsWriter
480	s.PointsWriter.Subscriber = s.Subscriber
481
482	// Open the subscriber service
483	if err := s.Subscriber.Open(); err != nil {
484		return fmt.Errorf("open subscriber: %s", err)
485	}
486
487	// Open the points writer service
488	if err := s.PointsWriter.Open(); err != nil {
489		return fmt.Errorf("open points writer: %s", err)
490	}
491
492	for _, service := range s.Services {
493		if err := service.Open(); err != nil {
494			return fmt.Errorf("open service: %s", err)
495		}
496	}
497
498	// Start the reporting service, if not disabled.
499	if !s.reportingDisabled {
500		go s.startServerReporting()
501	}
502
503	return nil
504}
505
506// Close shuts down the meta and data stores and all services.
507func (s *Server) Close() error {
508	s.stopProfile()
509
510	// Close the listener first to stop any new connections
511	if s.Listener != nil {
512		s.Listener.Close()
513	}
514
515	// Close services to allow any inflight requests to complete
516	// and prevent new requests from being accepted.
517	for _, service := range s.Services {
518		service.Close()
519	}
520
521	s.config.deregisterDiagnostics(s.Monitor)
522
523	if s.PointsWriter != nil {
524		s.PointsWriter.Close()
525	}
526
527	if s.QueryExecutor != nil {
528		s.QueryExecutor.Close()
529	}
530
531	// Close the TSDBStore, no more reads or writes at this point
532	if s.TSDBStore != nil {
533		s.TSDBStore.Close()
534	}
535
536	if s.Subscriber != nil {
537		s.Subscriber.Close()
538	}
539
540	if s.MetaClient != nil {
541		s.MetaClient.Close()
542	}
543
544	close(s.closing)
545	return nil
546}
547
548// startServerReporting starts periodic server reporting.
549func (s *Server) startServerReporting() {
550	s.reportServer()
551
552	ticker := time.NewTicker(24 * time.Hour)
553	defer ticker.Stop()
554	for {
555		select {
556		case <-s.closing:
557			return
558		case <-ticker.C:
559			s.reportServer()
560		}
561	}
562}
563
564// reportServer reports usage statistics about the system.
565func (s *Server) reportServer() {
566	dbs := s.MetaClient.Databases()
567	numDatabases := len(dbs)
568
569	var (
570		numMeasurements int64
571		numSeries       int64
572	)
573
574	for _, db := range dbs {
575		name := db.Name
576		// Use the context.Background() to avoid timing out on this.
577		n, err := s.TSDBStore.SeriesCardinality(context.Background(), name)
578		if err != nil {
579			s.Logger.Error(fmt.Sprintf("Unable to get series cardinality for database %s: %v", name, err))
580		} else {
581			numSeries += n
582		}
583
584		// Use the context.Background() to avoid timing out on this.
585		n, err = s.TSDBStore.MeasurementsCardinality(context.Background(), name)
586		if err != nil {
587			s.Logger.Error(fmt.Sprintf("Unable to get measurement cardinality for database %s: %v", name, err))
588		} else {
589			numMeasurements += n
590		}
591	}
592
593	clusterID := s.MetaClient.ClusterID()
594	cl := client.New("")
595	usage := client.Usage{
596		Product: "influxdb",
597		Data: []client.UsageData{
598			{
599				Values: client.Values{
600					"os":               runtime.GOOS,
601					"arch":             runtime.GOARCH,
602					"version":          s.buildInfo.Version,
603					"cluster_id":       fmt.Sprintf("%v", clusterID),
604					"num_series":       numSeries,
605					"num_measurements": numMeasurements,
606					"num_databases":    numDatabases,
607					"uptime":           time.Since(startTime).Seconds(),
608				},
609			},
610		},
611	}
612
613	s.Logger.Info("Sending usage statistics to usage.influxdata.com")
614
615	go cl.Save(usage)
616}
617
618// Service represents a service attached to the server.
619type Service interface {
620	WithLogger(log *zap.Logger)
621	Open() error
622	Close() error
623}
624
625// prof stores the file locations of active profiles.
626// StartProfile initializes the cpu and memory profile, if specified.
627func (s *Server) startProfile() error {
628	if s.CPUProfile != "" {
629		f, err := os.Create(s.CPUProfile)
630		if err != nil {
631			return fmt.Errorf("cpuprofile: %v", err)
632		}
633
634		s.CPUProfileWriteCloser = f
635		if err := pprof.StartCPUProfile(s.CPUProfileWriteCloser); err != nil {
636			return err
637		}
638
639		log.Printf("Writing CPU profile to: %s\n", s.CPUProfile)
640	}
641
642	if s.MemProfile != "" {
643		f, err := os.Create(s.MemProfile)
644		if err != nil {
645			return fmt.Errorf("memprofile: %v", err)
646		}
647
648		s.MemProfileWriteCloser = f
649		runtime.MemProfileRate = 4096
650
651		log.Printf("Writing mem profile to: %s\n", s.MemProfile)
652	}
653
654	return nil
655}
656
657// StopProfile closes the cpu and memory profiles if they are running.
658func (s *Server) stopProfile() error {
659	if s.CPUProfileWriteCloser != nil {
660		pprof.StopCPUProfile()
661		if err := s.CPUProfileWriteCloser.Close(); err != nil {
662			return err
663		}
664		log.Println("CPU profile stopped")
665	}
666
667	if s.MemProfileWriteCloser != nil {
668		pprof.Lookup("heap").WriteTo(s.MemProfileWriteCloser, 0)
669		if err := s.MemProfileWriteCloser.Close(); err != nil {
670			return err
671		}
672		log.Println("mem profile stopped")
673	}
674
675	return nil
676}
677
678// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps
679// to prevent a circular dependency between the `cluster` and `monitor` packages.
680type monitorPointsWriter coordinator.PointsWriter
681
682func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
683	writeCtx := tsdb.WriteContext{
684		UserId: tsdb.MonitorUser,
685	}
686	return (*coordinator.PointsWriter)(pw).WritePointsPrivileged(writeCtx, database, retentionPolicy, models.ConsistencyLevelAny, points)
687}
688
689func raftDBExists(dir string) error {
690	// Check to see if there is a raft db, if so, error out with a message
691	// to downgrade, export, and then import the meta data
692	raftFile := filepath.Join(dir, "raft.db")
693	if _, err := os.Stat(raftFile); err == nil {
694		return fmt.Errorf("detected %s. To proceed, you'll need to either 1) downgrade to v0.11.x, export your metadata, upgrade to the current version again, and then import the metadata or 2) delete the file, which will effectively reset your database. For more assistance with the upgrade, see: https://docs.influxdata.com/influxdb/v0.12/administration/upgrading/", raftFile)
695	}
696	return nil
697}
698