1// Package httpd implements the HTTP service and REST API for InfluxDB.
2package httpd // import "github.com/influxdata/influxdb/services/httpd"
3
4import (
5	"crypto/tls"
6	"fmt"
7	"net"
8	"net/http"
9	"os"
10	"path"
11	"runtime"
12	"strings"
13	"syscall"
14	"time"
15
16	"github.com/influxdata/influxdb/models"
17	"go.uber.org/zap"
18)
19
20// statistics gathered by the httpd package.
21const (
22	statRequest                      = "req"                    // Number of HTTP requests served.
23	statQueryRequest                 = "queryReq"               // Number of query requests served.
24	statWriteRequest                 = "writeReq"               // Number of write requests serverd.
25	statPingRequest                  = "pingReq"                // Number of ping requests served.
26	statStatusRequest                = "statusReq"              // Number of status requests served.
27	statWriteRequestBytesReceived    = "writeReqBytes"          // Sum of all bytes in write requests.
28	statQueryRequestBytesTransmitted = "queryRespBytes"         // Sum of all bytes returned in query reponses.
29	statPointsWrittenOK              = "pointsWrittenOK"        // Number of points written OK.
30	statPointsWrittenDropped         = "pointsWrittenDropped"   // Number of points dropped by the storage engine.
31	statPointsWrittenFail            = "pointsWrittenFail"      // Number of points that failed to be written.
32	statAuthFail                     = "authFail"               // Number of authentication failures.
33	statRequestDuration              = "reqDurationNs"          // Number of (wall-time) nanoseconds spent inside requests.
34	statQueryRequestDuration         = "queryReqDurationNs"     // Number of (wall-time) nanoseconds spent inside query requests.
35	statWriteRequestDuration         = "writeReqDurationNs"     // Number of (wall-time) nanoseconds spent inside write requests.
36	statRequestsActive               = "reqActive"              // Number of currently active requests.
37	statWriteRequestsActive          = "writeReqActive"         // Number of currently active write requests.
38	statClientError                  = "clientError"            // Number of HTTP responses due to client error.
39	statServerError                  = "serverError"            // Number of HTTP responses due to server error.
40	statRecoveredPanics              = "recoveredPanics"        // Number of panics recovered by HTTP handler.
41	statPromWriteRequest             = "promWriteReq"           // Number of write requests to the prometheus endpoint.
42	statPromReadRequest              = "promReadReq"            // Number of read requests to the prometheus endpoint.
43	statFluxQueryRequests            = "fluxQueryReq"           // Number of flux query requests served.
44	statFluxQueryRequestDuration     = "fluxQueryReqDurationNs" // Number of (wall-time) nanoseconds spent executing Flux query requests.
45
46)
47
48// Service manages the listener and handler for an HTTP endpoint.
49type Service struct {
50	ln        net.Listener
51	addr      string
52	https     bool
53	cert      string
54	key       string
55	limit     int
56	tlsConfig *tls.Config
57	err       chan error
58
59	unixSocket         bool
60	unixSocketPerm     uint32
61	unixSocketGroup    int
62	bindSocket         string
63	unixSocketListener net.Listener
64
65	Handler *Handler
66
67	Logger *zap.Logger
68}
69
70// NewService returns a new instance of Service.
71func NewService(c Config) *Service {
72	s := &Service{
73		addr:           c.BindAddress,
74		https:          c.HTTPSEnabled,
75		cert:           c.HTTPSCertificate,
76		key:            c.HTTPSPrivateKey,
77		limit:          c.MaxConnectionLimit,
78		tlsConfig:      c.TLS,
79		err:            make(chan error),
80		unixSocket:     c.UnixSocketEnabled,
81		unixSocketPerm: uint32(c.UnixSocketPermissions),
82		bindSocket:     c.BindSocket,
83		Handler:        NewHandler(c),
84		Logger:         zap.NewNop(),
85	}
86	if s.tlsConfig == nil {
87		s.tlsConfig = new(tls.Config)
88	}
89	if s.key == "" {
90		s.key = s.cert
91	}
92	if c.UnixSocketGroup != nil {
93		s.unixSocketGroup = int(*c.UnixSocketGroup)
94	}
95	s.Handler.Logger = s.Logger
96	return s
97}
98
99// Open starts the service.
100func (s *Service) Open() error {
101	s.Logger.Info("Starting HTTP service", zap.Bool("authentication", s.Handler.Config.AuthEnabled))
102
103	s.Handler.Open()
104
105	// Open listener.
106	if s.https {
107		cert, err := tls.LoadX509KeyPair(s.cert, s.key)
108		if err != nil {
109			return err
110		}
111
112		tlsConfig := s.tlsConfig.Clone()
113		tlsConfig.Certificates = []tls.Certificate{cert}
114
115		listener, err := tls.Listen("tcp", s.addr, tlsConfig)
116		if err != nil {
117			return err
118		}
119
120		s.ln = listener
121	} else {
122		listener, err := net.Listen("tcp", s.addr)
123		if err != nil {
124			return err
125		}
126
127		s.ln = listener
128	}
129	s.Logger.Info("Listening on HTTP",
130		zap.Stringer("addr", s.ln.Addr()),
131		zap.Bool("https", s.https))
132
133	// Open unix socket listener.
134	if s.unixSocket {
135		if runtime.GOOS == "windows" {
136			return fmt.Errorf("unable to use unix socket on windows")
137		}
138		if err := os.MkdirAll(path.Dir(s.bindSocket), 0777); err != nil {
139			return err
140		}
141		if err := syscall.Unlink(s.bindSocket); err != nil && !os.IsNotExist(err) {
142			return err
143		}
144
145		listener, err := net.Listen("unix", s.bindSocket)
146		if err != nil {
147			return err
148		}
149		if s.unixSocketPerm != 0 {
150			if err := os.Chmod(s.bindSocket, os.FileMode(s.unixSocketPerm)); err != nil {
151				return err
152			}
153		}
154		if s.unixSocketGroup != 0 {
155			if err := os.Chown(s.bindSocket, -1, s.unixSocketGroup); err != nil {
156				return err
157			}
158		}
159
160		s.Logger.Info("Listening on unix socket",
161			zap.Stringer("addr", listener.Addr()))
162		s.unixSocketListener = listener
163
164		go s.serveUnixSocket()
165	}
166
167	// Enforce a connection limit if one has been given.
168	if s.limit > 0 {
169		s.ln = LimitListener(s.ln, s.limit)
170	}
171
172	// wait for the listeners to start
173	timeout := time.Now().Add(time.Second)
174	for {
175		if s.ln.Addr() != nil {
176			break
177		}
178
179		if time.Now().After(timeout) {
180			return fmt.Errorf("unable to open without http listener running")
181		}
182		time.Sleep(10 * time.Millisecond)
183	}
184
185	// Begin listening for requests in a separate goroutine.
186	go s.serveTCP()
187	return nil
188}
189
190// Close closes the underlying listener.
191func (s *Service) Close() error {
192	s.Handler.Close()
193
194	if s.ln != nil {
195		if err := s.ln.Close(); err != nil {
196			return err
197		}
198	}
199	if s.unixSocketListener != nil {
200		if err := s.unixSocketListener.Close(); err != nil {
201			return err
202		}
203	}
204	return nil
205}
206
207// WithLogger sets the logger for the service.
208func (s *Service) WithLogger(log *zap.Logger) {
209	s.Logger = log.With(zap.String("service", "httpd"))
210	s.Handler.Logger = s.Logger
211}
212
213// Err returns a channel for fatal errors that occur on the listener.
214func (s *Service) Err() <-chan error { return s.err }
215
216// Addr returns the listener's address. Returns nil if listener is closed.
217func (s *Service) Addr() net.Addr {
218	if s.ln != nil {
219		return s.ln.Addr()
220	}
221	return nil
222}
223
224// Statistics returns statistics for periodic monitoring.
225func (s *Service) Statistics(tags map[string]string) []models.Statistic {
226	return s.Handler.Statistics(models.NewTags(map[string]string{"bind": s.addr}).Merge(tags).Map())
227}
228
229// BoundHTTPAddr returns the string version of the address that the HTTP server is listening on.
230// This is useful if you start an ephemeral server in test with bind address localhost:0.
231func (s *Service) BoundHTTPAddr() string {
232	return s.ln.Addr().String()
233}
234
235// serveTCP serves the handler from the TCP listener.
236func (s *Service) serveTCP() {
237	s.serve(s.ln)
238}
239
240// serveUnixSocket serves the handler from the unix socket listener.
241func (s *Service) serveUnixSocket() {
242	s.serve(s.unixSocketListener)
243}
244
245// serve serves the handler from the listener.
246func (s *Service) serve(listener net.Listener) {
247	// The listener was closed so exit
248	// See https://github.com/golang/go/issues/4373
249	err := http.Serve(listener, s.Handler)
250	if err != nil && !strings.Contains(err.Error(), "closed") {
251		s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
252	}
253}
254