1// Package httpd implements the HTTP service and REST API for InfluxDB. 2package httpd // import "github.com/influxdata/influxdb/services/httpd" 3 4import ( 5 "crypto/tls" 6 "fmt" 7 "net" 8 "net/http" 9 "os" 10 "path" 11 "runtime" 12 "strings" 13 "syscall" 14 "time" 15 16 "github.com/influxdata/influxdb/models" 17 "go.uber.org/zap" 18) 19 20// statistics gathered by the httpd package. 21const ( 22 statRequest = "req" // Number of HTTP requests served. 23 statQueryRequest = "queryReq" // Number of query requests served. 24 statWriteRequest = "writeReq" // Number of write requests serverd. 25 statPingRequest = "pingReq" // Number of ping requests served. 26 statStatusRequest = "statusReq" // Number of status requests served. 27 statWriteRequestBytesReceived = "writeReqBytes" // Sum of all bytes in write requests. 28 statQueryRequestBytesTransmitted = "queryRespBytes" // Sum of all bytes returned in query reponses. 29 statPointsWrittenOK = "pointsWrittenOK" // Number of points written OK. 30 statPointsWrittenDropped = "pointsWrittenDropped" // Number of points dropped by the storage engine. 31 statPointsWrittenFail = "pointsWrittenFail" // Number of points that failed to be written. 32 statAuthFail = "authFail" // Number of authentication failures. 33 statRequestDuration = "reqDurationNs" // Number of (wall-time) nanoseconds spent inside requests. 34 statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests. 35 statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests. 36 statRequestsActive = "reqActive" // Number of currently active requests. 37 statWriteRequestsActive = "writeReqActive" // Number of currently active write requests. 38 statClientError = "clientError" // Number of HTTP responses due to client error. 39 statServerError = "serverError" // Number of HTTP responses due to server error. 40 statRecoveredPanics = "recoveredPanics" // Number of panics recovered by HTTP handler. 41 statPromWriteRequest = "promWriteReq" // Number of write requests to the prometheus endpoint. 42 statPromReadRequest = "promReadReq" // Number of read requests to the prometheus endpoint. 43 statFluxQueryRequests = "fluxQueryReq" // Number of flux query requests served. 44 statFluxQueryRequestDuration = "fluxQueryReqDurationNs" // Number of (wall-time) nanoseconds spent executing Flux query requests. 45 46) 47 48// Service manages the listener and handler for an HTTP endpoint. 49type Service struct { 50 ln net.Listener 51 addr string 52 https bool 53 cert string 54 key string 55 limit int 56 tlsConfig *tls.Config 57 err chan error 58 59 unixSocket bool 60 unixSocketPerm uint32 61 unixSocketGroup int 62 bindSocket string 63 unixSocketListener net.Listener 64 65 Handler *Handler 66 67 Logger *zap.Logger 68} 69 70// NewService returns a new instance of Service. 71func NewService(c Config) *Service { 72 s := &Service{ 73 addr: c.BindAddress, 74 https: c.HTTPSEnabled, 75 cert: c.HTTPSCertificate, 76 key: c.HTTPSPrivateKey, 77 limit: c.MaxConnectionLimit, 78 tlsConfig: c.TLS, 79 err: make(chan error), 80 unixSocket: c.UnixSocketEnabled, 81 unixSocketPerm: uint32(c.UnixSocketPermissions), 82 bindSocket: c.BindSocket, 83 Handler: NewHandler(c), 84 Logger: zap.NewNop(), 85 } 86 if s.tlsConfig == nil { 87 s.tlsConfig = new(tls.Config) 88 } 89 if s.key == "" { 90 s.key = s.cert 91 } 92 if c.UnixSocketGroup != nil { 93 s.unixSocketGroup = int(*c.UnixSocketGroup) 94 } 95 s.Handler.Logger = s.Logger 96 return s 97} 98 99// Open starts the service. 100func (s *Service) Open() error { 101 s.Logger.Info("Starting HTTP service", zap.Bool("authentication", s.Handler.Config.AuthEnabled)) 102 103 s.Handler.Open() 104 105 // Open listener. 106 if s.https { 107 cert, err := tls.LoadX509KeyPair(s.cert, s.key) 108 if err != nil { 109 return err 110 } 111 112 tlsConfig := s.tlsConfig.Clone() 113 tlsConfig.Certificates = []tls.Certificate{cert} 114 115 listener, err := tls.Listen("tcp", s.addr, tlsConfig) 116 if err != nil { 117 return err 118 } 119 120 s.ln = listener 121 } else { 122 listener, err := net.Listen("tcp", s.addr) 123 if err != nil { 124 return err 125 } 126 127 s.ln = listener 128 } 129 s.Logger.Info("Listening on HTTP", 130 zap.Stringer("addr", s.ln.Addr()), 131 zap.Bool("https", s.https)) 132 133 // Open unix socket listener. 134 if s.unixSocket { 135 if runtime.GOOS == "windows" { 136 return fmt.Errorf("unable to use unix socket on windows") 137 } 138 if err := os.MkdirAll(path.Dir(s.bindSocket), 0777); err != nil { 139 return err 140 } 141 if err := syscall.Unlink(s.bindSocket); err != nil && !os.IsNotExist(err) { 142 return err 143 } 144 145 listener, err := net.Listen("unix", s.bindSocket) 146 if err != nil { 147 return err 148 } 149 if s.unixSocketPerm != 0 { 150 if err := os.Chmod(s.bindSocket, os.FileMode(s.unixSocketPerm)); err != nil { 151 return err 152 } 153 } 154 if s.unixSocketGroup != 0 { 155 if err := os.Chown(s.bindSocket, -1, s.unixSocketGroup); err != nil { 156 return err 157 } 158 } 159 160 s.Logger.Info("Listening on unix socket", 161 zap.Stringer("addr", listener.Addr())) 162 s.unixSocketListener = listener 163 164 go s.serveUnixSocket() 165 } 166 167 // Enforce a connection limit if one has been given. 168 if s.limit > 0 { 169 s.ln = LimitListener(s.ln, s.limit) 170 } 171 172 // wait for the listeners to start 173 timeout := time.Now().Add(time.Second) 174 for { 175 if s.ln.Addr() != nil { 176 break 177 } 178 179 if time.Now().After(timeout) { 180 return fmt.Errorf("unable to open without http listener running") 181 } 182 time.Sleep(10 * time.Millisecond) 183 } 184 185 // Begin listening for requests in a separate goroutine. 186 go s.serveTCP() 187 return nil 188} 189 190// Close closes the underlying listener. 191func (s *Service) Close() error { 192 s.Handler.Close() 193 194 if s.ln != nil { 195 if err := s.ln.Close(); err != nil { 196 return err 197 } 198 } 199 if s.unixSocketListener != nil { 200 if err := s.unixSocketListener.Close(); err != nil { 201 return err 202 } 203 } 204 return nil 205} 206 207// WithLogger sets the logger for the service. 208func (s *Service) WithLogger(log *zap.Logger) { 209 s.Logger = log.With(zap.String("service", "httpd")) 210 s.Handler.Logger = s.Logger 211} 212 213// Err returns a channel for fatal errors that occur on the listener. 214func (s *Service) Err() <-chan error { return s.err } 215 216// Addr returns the listener's address. Returns nil if listener is closed. 217func (s *Service) Addr() net.Addr { 218 if s.ln != nil { 219 return s.ln.Addr() 220 } 221 return nil 222} 223 224// Statistics returns statistics for periodic monitoring. 225func (s *Service) Statistics(tags map[string]string) []models.Statistic { 226 return s.Handler.Statistics(models.NewTags(map[string]string{"bind": s.addr}).Merge(tags).Map()) 227} 228 229// BoundHTTPAddr returns the string version of the address that the HTTP server is listening on. 230// This is useful if you start an ephemeral server in test with bind address localhost:0. 231func (s *Service) BoundHTTPAddr() string { 232 return s.ln.Addr().String() 233} 234 235// serveTCP serves the handler from the TCP listener. 236func (s *Service) serveTCP() { 237 s.serve(s.ln) 238} 239 240// serveUnixSocket serves the handler from the unix socket listener. 241func (s *Service) serveUnixSocket() { 242 s.serve(s.unixSocketListener) 243} 244 245// serve serves the handler from the listener. 246func (s *Service) serve(listener net.Listener) { 247 // The listener was closed so exit 248 // See https://github.com/golang/go/issues/4373 249 err := http.Serve(listener, s.Handler) 250 if err != nil && !strings.Contains(err.Error(), "closed") { 251 s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err) 252 } 253} 254