1package run
2
3import (
4	"crypto/tls"
5	"fmt"
6	"io"
7	"log"
8	"net"
9	"os"
10	"path/filepath"
11	"runtime"
12	"runtime/pprof"
13	"time"
14
15	"github.com/influxdata/influxdb"
16	"github.com/influxdata/influxdb/coordinator"
17	"github.com/influxdata/influxdb/flux/control"
18	"github.com/influxdata/influxdb/logger"
19	"github.com/influxdata/influxdb/models"
20	"github.com/influxdata/influxdb/monitor"
21	"github.com/influxdata/influxdb/query"
22	"github.com/influxdata/influxdb/services/collectd"
23	"github.com/influxdata/influxdb/services/continuous_querier"
24	"github.com/influxdata/influxdb/services/graphite"
25	"github.com/influxdata/influxdb/services/httpd"
26	"github.com/influxdata/influxdb/services/meta"
27	"github.com/influxdata/influxdb/services/opentsdb"
28	"github.com/influxdata/influxdb/services/precreator"
29	"github.com/influxdata/influxdb/services/retention"
30	"github.com/influxdata/influxdb/services/snapshotter"
31	"github.com/influxdata/influxdb/services/storage"
32	"github.com/influxdata/influxdb/services/subscriber"
33	"github.com/influxdata/influxdb/services/udp"
34	"github.com/influxdata/influxdb/storage/reads"
35	"github.com/influxdata/influxdb/tcp"
36	"github.com/influxdata/influxdb/tsdb"
37	client "github.com/influxdata/usage-client/v1"
38	"go.uber.org/zap"
39
40	// Initialize the engine package
41	_ "github.com/influxdata/influxdb/tsdb/engine"
42	// Initialize the index package
43	_ "github.com/influxdata/influxdb/tsdb/index"
44)
45
46var startTime time.Time
47
48func init() {
49	startTime = time.Now().UTC()
50}
51
52// BuildInfo represents the build details for the server code.
53type BuildInfo struct {
54	Version string
55	Commit  string
56	Branch  string
57	Time    string
58}
59
60// Server represents a container for the metadata and storage data and services.
61// It is built using a Config and it manages the startup and shutdown of all
62// services in the proper order.
63type Server struct {
64	buildInfo BuildInfo
65
66	err     chan error
67	closing chan struct{}
68
69	BindAddress string
70	Listener    net.Listener
71
72	Logger *zap.Logger
73
74	MetaClient *meta.Client
75
76	TSDBStore     *tsdb.Store
77	QueryExecutor *query.Executor
78	PointsWriter  *coordinator.PointsWriter
79	Subscriber    *subscriber.Service
80
81	Services []Service
82
83	// These references are required for the tcp muxer.
84	SnapshotterService *snapshotter.Service
85
86	Monitor *monitor.Monitor
87
88	// Server reporting and registration
89	reportingDisabled bool
90
91	// Profiling
92	CPUProfile string
93	MemProfile string
94
95	// httpAPIAddr is the host:port combination for the main HTTP API for querying and writing data
96	httpAPIAddr string
97
98	// httpUseTLS specifies if we should use a TLS connection to the http servers
99	httpUseTLS bool
100
101	// tcpAddr is the host:port combination for the TCP listener that services mux onto
102	tcpAddr string
103
104	config *Config
105}
106
107// updateTLSConfig stores with into the tls config pointed at by into but only if with is not nil
108// and into is nil. Think of it as setting the default value.
109func updateTLSConfig(into **tls.Config, with *tls.Config) {
110	if with != nil && into != nil && *into == nil {
111		*into = with
112	}
113}
114
115// NewServer returns a new instance of Server built from a config.
116func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
117	// First grab the base tls config we will use for all clients and servers
118	tlsConfig, err := c.TLS.Parse()
119	if err != nil {
120		return nil, fmt.Errorf("tls configuration: %v", err)
121	}
122
123	// Update the TLS values on each of the configs to be the parsed one if
124	// not already specified (set the default).
125	updateTLSConfig(&c.HTTPD.TLS, tlsConfig)
126	updateTLSConfig(&c.Subscriber.TLS, tlsConfig)
127	for i := range c.OpenTSDBInputs {
128		updateTLSConfig(&c.OpenTSDBInputs[i].TLS, tlsConfig)
129	}
130
131	// We need to ensure that a meta directory always exists even if
132	// we don't start the meta store.  node.json is always stored under
133	// the meta directory.
134	if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil {
135		return nil, fmt.Errorf("mkdir all: %s", err)
136	}
137
138	// 0.10-rc1 and prior would sometimes put the node.json at the root
139	// dir which breaks backup/restore and restarting nodes.  This moves
140	// the file from the root so it's always under the meta dir.
141	oldPath := filepath.Join(filepath.Dir(c.Meta.Dir), "node.json")
142	newPath := filepath.Join(c.Meta.Dir, "node.json")
143
144	if _, err := os.Stat(oldPath); err == nil {
145		if err := os.Rename(oldPath, newPath); err != nil {
146			return nil, err
147		}
148	}
149
150	_, err = influxdb.LoadNode(c.Meta.Dir)
151	if err != nil {
152		if !os.IsNotExist(err) {
153			return nil, err
154		}
155	}
156
157	if err := raftDBExists(c.Meta.Dir); err != nil {
158		return nil, err
159	}
160
161	// In 0.10.0 bind-address got moved to the top level. Check
162	// The old location to keep things backwards compatible
163	bind := c.BindAddress
164
165	s := &Server{
166		buildInfo: *buildInfo,
167		err:       make(chan error),
168		closing:   make(chan struct{}),
169
170		BindAddress: bind,
171
172		Logger: logger.New(os.Stderr),
173
174		MetaClient: meta.NewClient(c.Meta),
175
176		reportingDisabled: c.ReportingDisabled,
177
178		httpAPIAddr: c.HTTPD.BindAddress,
179		httpUseTLS:  c.HTTPD.HTTPSEnabled,
180		tcpAddr:     bind,
181
182		config: c,
183	}
184	s.Monitor = monitor.New(s, c.Monitor)
185	s.config.registerDiagnostics(s.Monitor)
186
187	if err := s.MetaClient.Open(); err != nil {
188		return nil, err
189	}
190
191	s.TSDBStore = tsdb.NewStore(c.Data.Dir)
192	s.TSDBStore.EngineOptions.Config = c.Data
193
194	// Copy TSDB configuration.
195	s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
196	s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index
197
198	// Create the Subscriber service
199	s.Subscriber = subscriber.NewService(c.Subscriber)
200
201	// Initialize points writer.
202	s.PointsWriter = coordinator.NewPointsWriter()
203	s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
204	s.PointsWriter.TSDBStore = s.TSDBStore
205
206	// Initialize query executor.
207	s.QueryExecutor = query.NewExecutor()
208	s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
209		MetaClient:  s.MetaClient,
210		TaskManager: s.QueryExecutor.TaskManager,
211		TSDBStore:   s.TSDBStore,
212		ShardMapper: &coordinator.LocalShardMapper{
213			MetaClient: s.MetaClient,
214			TSDBStore:  coordinator.LocalTSDBStore{Store: s.TSDBStore},
215		},
216		Monitor:           s.Monitor,
217		PointsWriter:      s.PointsWriter,
218		MaxSelectPointN:   c.Coordinator.MaxSelectPointN,
219		MaxSelectSeriesN:  c.Coordinator.MaxSelectSeriesN,
220		MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
221	}
222	s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
223	s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
224	s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries
225
226	// Initialize the monitor
227	s.Monitor.Version = s.buildInfo.Version
228	s.Monitor.Commit = s.buildInfo.Commit
229	s.Monitor.Branch = s.buildInfo.Branch
230	s.Monitor.BuildTime = s.buildInfo.Time
231	s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)
232	return s, nil
233}
234
235// Statistics returns statistics for the services running in the Server.
236func (s *Server) Statistics(tags map[string]string) []models.Statistic {
237	var statistics []models.Statistic
238	statistics = append(statistics, s.QueryExecutor.Statistics(tags)...)
239	statistics = append(statistics, s.TSDBStore.Statistics(tags)...)
240	statistics = append(statistics, s.PointsWriter.Statistics(tags)...)
241	statistics = append(statistics, s.Subscriber.Statistics(tags)...)
242	for _, srv := range s.Services {
243		if m, ok := srv.(monitor.Reporter); ok {
244			statistics = append(statistics, m.Statistics(tags)...)
245		}
246	}
247	return statistics
248}
249
250func (s *Server) appendSnapshotterService() {
251	srv := snapshotter.NewService()
252	srv.TSDBStore = s.TSDBStore
253	srv.MetaClient = s.MetaClient
254	s.Services = append(s.Services, srv)
255	s.SnapshotterService = srv
256}
257
258// SetLogOutput sets the logger used for all messages. It must not be called
259// after the Open method has been called.
260func (s *Server) SetLogOutput(w io.Writer) {
261	s.Logger = logger.New(w)
262}
263
264func (s *Server) appendMonitorService() {
265	s.Services = append(s.Services, s.Monitor)
266}
267
268func (s *Server) appendRetentionPolicyService(c retention.Config) {
269	if !c.Enabled {
270		return
271	}
272	srv := retention.NewService(c)
273	srv.MetaClient = s.MetaClient
274	srv.TSDBStore = s.TSDBStore
275	s.Services = append(s.Services, srv)
276}
277
278func (s *Server) appendHTTPDService(c httpd.Config) {
279	if !c.Enabled {
280		return
281	}
282	srv := httpd.NewService(c)
283	srv.Handler.MetaClient = s.MetaClient
284	authorizer := meta.NewQueryAuthorizer(s.MetaClient)
285	srv.Handler.QueryAuthorizer = authorizer
286	srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient)
287	srv.Handler.QueryExecutor = s.QueryExecutor
288	srv.Handler.Monitor = s.Monitor
289	srv.Handler.PointsWriter = s.PointsWriter
290	srv.Handler.Version = s.buildInfo.Version
291	srv.Handler.BuildType = "OSS"
292	ss := storage.NewStore(s.TSDBStore, s.MetaClient)
293	srv.Handler.Store = ss
294	srv.Handler.Controller = control.NewController(s.MetaClient, reads.NewReader(ss), authorizer, c.AuthEnabled, s.Logger)
295
296	s.Services = append(s.Services, srv)
297}
298
299func (s *Server) appendCollectdService(c collectd.Config) {
300	if !c.Enabled {
301		return
302	}
303	srv := collectd.NewService(c)
304	srv.MetaClient = s.MetaClient
305	srv.PointsWriter = s.PointsWriter
306	s.Services = append(s.Services, srv)
307}
308
309func (s *Server) appendOpenTSDBService(c opentsdb.Config) error {
310	if !c.Enabled {
311		return nil
312	}
313	srv, err := opentsdb.NewService(c)
314	if err != nil {
315		return err
316	}
317	srv.PointsWriter = s.PointsWriter
318	srv.MetaClient = s.MetaClient
319	s.Services = append(s.Services, srv)
320	return nil
321}
322
323func (s *Server) appendGraphiteService(c graphite.Config) error {
324	if !c.Enabled {
325		return nil
326	}
327	srv, err := graphite.NewService(c)
328	if err != nil {
329		return err
330	}
331
332	srv.PointsWriter = s.PointsWriter
333	srv.MetaClient = s.MetaClient
334	srv.Monitor = s.Monitor
335	s.Services = append(s.Services, srv)
336	return nil
337}
338
339func (s *Server) appendPrecreatorService(c precreator.Config) error {
340	if !c.Enabled {
341		return nil
342	}
343	srv := precreator.NewService(c)
344	srv.MetaClient = s.MetaClient
345	s.Services = append(s.Services, srv)
346	return nil
347}
348
349func (s *Server) appendUDPService(c udp.Config) {
350	if !c.Enabled {
351		return
352	}
353	srv := udp.NewService(c)
354	srv.PointsWriter = s.PointsWriter
355	srv.MetaClient = s.MetaClient
356	s.Services = append(s.Services, srv)
357}
358
359func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
360	if !c.Enabled {
361		return
362	}
363	srv := continuous_querier.NewService(c)
364	srv.MetaClient = s.MetaClient
365	srv.QueryExecutor = s.QueryExecutor
366	srv.Monitor = s.Monitor
367	s.Services = append(s.Services, srv)
368}
369
370// Err returns an error channel that multiplexes all out of band errors received from all services.
371func (s *Server) Err() <-chan error { return s.err }
372
373// Open opens the meta and data store and all services.
374func (s *Server) Open() error {
375	// Start profiling, if set.
376	startProfile(s.CPUProfile, s.MemProfile)
377
378	// Open shared TCP connection.
379	ln, err := net.Listen("tcp", s.BindAddress)
380	if err != nil {
381		return fmt.Errorf("listen: %s", err)
382	}
383	s.Listener = ln
384
385	// Multiplex listener.
386	mux := tcp.NewMux()
387	go mux.Serve(ln)
388
389	// Append services.
390	s.appendMonitorService()
391	s.appendPrecreatorService(s.config.Precreator)
392	s.appendSnapshotterService()
393	s.appendContinuousQueryService(s.config.ContinuousQuery)
394	s.appendHTTPDService(s.config.HTTPD)
395	s.appendRetentionPolicyService(s.config.Retention)
396	for _, i := range s.config.GraphiteInputs {
397		if err := s.appendGraphiteService(i); err != nil {
398			return err
399		}
400	}
401	for _, i := range s.config.CollectdInputs {
402		s.appendCollectdService(i)
403	}
404	for _, i := range s.config.OpenTSDBInputs {
405		if err := s.appendOpenTSDBService(i); err != nil {
406			return err
407		}
408	}
409	for _, i := range s.config.UDPInputs {
410		s.appendUDPService(i)
411	}
412
413	s.Subscriber.MetaClient = s.MetaClient
414	s.PointsWriter.MetaClient = s.MetaClient
415	s.Monitor.MetaClient = s.MetaClient
416
417	s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
418
419	// Configure logging for all services and clients.
420	if s.config.Meta.LoggingEnabled {
421		s.MetaClient.WithLogger(s.Logger)
422	}
423	s.TSDBStore.WithLogger(s.Logger)
424	if s.config.Data.QueryLogEnabled {
425		s.QueryExecutor.WithLogger(s.Logger)
426	}
427	s.PointsWriter.WithLogger(s.Logger)
428	s.Subscriber.WithLogger(s.Logger)
429	for _, svc := range s.Services {
430		svc.WithLogger(s.Logger)
431	}
432	s.SnapshotterService.WithLogger(s.Logger)
433	s.Monitor.WithLogger(s.Logger)
434
435	// Open TSDB store.
436	if err := s.TSDBStore.Open(); err != nil {
437		return fmt.Errorf("open tsdb store: %s", err)
438	}
439
440	// Open the subscriber service
441	if err := s.Subscriber.Open(); err != nil {
442		return fmt.Errorf("open subscriber: %s", err)
443	}
444
445	// Open the points writer service
446	if err := s.PointsWriter.Open(); err != nil {
447		return fmt.Errorf("open points writer: %s", err)
448	}
449
450	s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())
451
452	for _, service := range s.Services {
453		if err := service.Open(); err != nil {
454			return fmt.Errorf("open service: %s", err)
455		}
456	}
457
458	// Start the reporting service, if not disabled.
459	if !s.reportingDisabled {
460		go s.startServerReporting()
461	}
462
463	return nil
464}
465
466// Close shuts down the meta and data stores and all services.
467func (s *Server) Close() error {
468	stopProfile()
469
470	// Close the listener first to stop any new connections
471	if s.Listener != nil {
472		s.Listener.Close()
473	}
474
475	// Close services to allow any inflight requests to complete
476	// and prevent new requests from being accepted.
477	for _, service := range s.Services {
478		service.Close()
479	}
480
481	s.config.deregisterDiagnostics(s.Monitor)
482
483	if s.PointsWriter != nil {
484		s.PointsWriter.Close()
485	}
486
487	if s.QueryExecutor != nil {
488		s.QueryExecutor.Close()
489	}
490
491	// Close the TSDBStore, no more reads or writes at this point
492	if s.TSDBStore != nil {
493		s.TSDBStore.Close()
494	}
495
496	if s.Subscriber != nil {
497		s.Subscriber.Close()
498	}
499
500	if s.MetaClient != nil {
501		s.MetaClient.Close()
502	}
503
504	close(s.closing)
505	return nil
506}
507
508// startServerReporting starts periodic server reporting.
509func (s *Server) startServerReporting() {
510	s.reportServer()
511
512	ticker := time.NewTicker(24 * time.Hour)
513	defer ticker.Stop()
514	for {
515		select {
516		case <-s.closing:
517			return
518		case <-ticker.C:
519			s.reportServer()
520		}
521	}
522}
523
524// reportServer reports usage statistics about the system.
525func (s *Server) reportServer() {
526	dbs := s.MetaClient.Databases()
527	numDatabases := len(dbs)
528
529	var (
530		numMeasurements int64
531		numSeries       int64
532	)
533
534	for _, db := range dbs {
535		name := db.Name
536		n, err := s.TSDBStore.SeriesCardinality(name)
537		if err != nil {
538			s.Logger.Error(fmt.Sprintf("Unable to get series cardinality for database %s: %v", name, err))
539		} else {
540			numSeries += n
541		}
542
543		n, err = s.TSDBStore.MeasurementsCardinality(name)
544		if err != nil {
545			s.Logger.Error(fmt.Sprintf("Unable to get measurement cardinality for database %s: %v", name, err))
546		} else {
547			numMeasurements += n
548		}
549	}
550
551	clusterID := s.MetaClient.ClusterID()
552	cl := client.New("")
553	usage := client.Usage{
554		Product: "influxdb",
555		Data: []client.UsageData{
556			{
557				Values: client.Values{
558					"os":               runtime.GOOS,
559					"arch":             runtime.GOARCH,
560					"version":          s.buildInfo.Version,
561					"cluster_id":       fmt.Sprintf("%v", clusterID),
562					"num_series":       numSeries,
563					"num_measurements": numMeasurements,
564					"num_databases":    numDatabases,
565					"uptime":           time.Since(startTime).Seconds(),
566				},
567			},
568		},
569	}
570
571	s.Logger.Info("Sending usage statistics to usage.influxdata.com")
572
573	go cl.Save(usage)
574}
575
576// Service represents a service attached to the server.
577type Service interface {
578	WithLogger(log *zap.Logger)
579	Open() error
580	Close() error
581}
582
583// prof stores the file locations of active profiles.
584var prof struct {
585	cpu *os.File
586	mem *os.File
587}
588
589// StartProfile initializes the cpu and memory profile, if specified.
590func startProfile(cpuprofile, memprofile string) {
591	if cpuprofile != "" {
592		f, err := os.Create(cpuprofile)
593		if err != nil {
594			log.Fatalf("cpuprofile: %v", err)
595		}
596		log.Printf("writing CPU profile to: %s\n", cpuprofile)
597		prof.cpu = f
598		pprof.StartCPUProfile(prof.cpu)
599	}
600
601	if memprofile != "" {
602		f, err := os.Create(memprofile)
603		if err != nil {
604			log.Fatalf("memprofile: %v", err)
605		}
606		log.Printf("writing mem profile to: %s\n", memprofile)
607		prof.mem = f
608		runtime.MemProfileRate = 4096
609	}
610
611}
612
613// StopProfile closes the cpu and memory profiles if they are running.
614func stopProfile() {
615	if prof.cpu != nil {
616		pprof.StopCPUProfile()
617		prof.cpu.Close()
618		log.Println("CPU profile stopped")
619	}
620	if prof.mem != nil {
621		pprof.Lookup("heap").WriteTo(prof.mem, 0)
622		prof.mem.Close()
623		log.Println("mem profile stopped")
624	}
625}
626
627// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps
628// to prevent a circular dependency between the `cluster` and `monitor` packages.
629type monitorPointsWriter coordinator.PointsWriter
630
631func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
632	return (*coordinator.PointsWriter)(pw).WritePointsPrivileged(database, retentionPolicy, models.ConsistencyLevelAny, points)
633}
634
635func raftDBExists(dir string) error {
636	// Check to see if there is a raft db, if so, error out with a message
637	// to downgrade, export, and then import the meta data
638	raftFile := filepath.Join(dir, "raft.db")
639	if _, err := os.Stat(raftFile); err == nil {
640		return fmt.Errorf("detected %s. To proceed, you'll need to either 1) downgrade to v0.11.x, export your metadata, upgrade to the current version again, and then import the metadata or 2) delete the file, which will effectively reset your database. For more assistance with the upgrade, see: https://docs.influxdata.com/influxdb/v0.12/administration/upgrading/", raftFile)
641	}
642	return nil
643}
644