1// Copyright 2014 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package main 15 16import ( 17 "context" 18 "fmt" 19 "net" 20 "net/http" 21 "net/http/pprof" 22 "net/url" 23 "os" 24 "os/signal" 25 "path" 26 "path/filepath" 27 "strings" 28 "syscall" 29 30 "github.com/go-kit/log" 31 "github.com/go-kit/log/level" 32 "github.com/prometheus/client_golang/prometheus" 33 "github.com/prometheus/client_golang/prometheus/promhttp" 34 "github.com/prometheus/common/promlog" 35 "github.com/prometheus/common/route" 36 "github.com/prometheus/common/version" 37 "github.com/prometheus/exporter-toolkit/web" 38 webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" 39 "gopkg.in/alecthomas/kingpin.v2" 40 41 dto "github.com/prometheus/client_model/go" 42 promlogflag "github.com/prometheus/common/promlog/flag" 43 44 api_v1 "github.com/prometheus/pushgateway/api/v1" 45 "github.com/prometheus/pushgateway/asset" 46 "github.com/prometheus/pushgateway/handler" 47 "github.com/prometheus/pushgateway/storage" 48) 49 50func init() { 51 prometheus.MustRegister(version.NewCollector("pushgateway")) 52} 53 54// logFunc in an adaptor to plug gokit logging into promhttp.HandlerOpts. 55type logFunc func(...interface{}) error 56 57func (lf logFunc) Println(v ...interface{}) { 58 lf("msg", fmt.Sprintln(v...)) 59} 60 61func main() { 62 var ( 63 app = kingpin.New(filepath.Base(os.Args[0]), "The Pushgateway") 64 webConfig = webflag.AddFlags(app) 65 listenAddress = app.Flag("web.listen-address", "Address to listen on for the web interface, API, and telemetry.").Default(":9091").String() 66 metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() 67 externalURL = app.Flag("web.external-url", "The URL under which the Pushgateway is externally reachable.").Default("").URL() 68 routePrefix = app.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to the path of --web.external-url.").Default("").String() 69 enableLifeCycle = app.Flag("web.enable-lifecycle", "Enable shutdown via HTTP request.").Default("false").Bool() 70 enableAdminAPI = app.Flag("web.enable-admin-api", "Enable API endpoints for admin control actions.").Default("false").Bool() 71 persistenceFile = app.Flag("persistence.file", "File to persist metrics. If empty, metrics are only kept in memory.").Default("").String() 72 persistenceInterval = app.Flag("persistence.interval", "The minimum interval at which to write out the persistence file.").Default("5m").Duration() 73 pushUnchecked = app.Flag("push.disable-consistency-check", "Do not check consistency of pushed metrics. DANGEROUS.").Default("false").Bool() 74 promlogConfig = promlog.Config{} 75 ) 76 promlogflag.AddFlags(app, &promlogConfig) 77 app.Version(version.Print("pushgateway")) 78 app.HelpFlag.Short('h') 79 kingpin.MustParse(app.Parse(os.Args[1:])) 80 logger := promlog.New(&promlogConfig) 81 82 *routePrefix = computeRoutePrefix(*routePrefix, *externalURL) 83 externalPathPrefix := computeRoutePrefix("", *externalURL) 84 85 level.Info(logger).Log("msg", "starting pushgateway", "version", version.Info()) 86 level.Info(logger).Log("build_context", version.BuildContext()) 87 level.Debug(logger).Log("msg", "external URL", "url", *externalURL) 88 level.Debug(logger).Log("msg", "path prefix used externally", "path", externalPathPrefix) 89 level.Debug(logger).Log("msg", "path prefix for internal routing", "path", *routePrefix) 90 91 // flags is used to show command line flags on the status page. 92 // Kingpin default flags are excluded as they would be confusing. 93 flags := map[string]string{} 94 boilerplateFlags := kingpin.New("", "").Version("") 95 for _, f := range app.Model().Flags { 96 if boilerplateFlags.GetFlag(f.Name) == nil { 97 flags[f.Name] = f.Value.String() 98 } 99 } 100 101 ms := storage.NewDiskMetricStore(*persistenceFile, *persistenceInterval, prometheus.DefaultGatherer, logger) 102 103 // Create a Gatherer combining the DefaultGatherer and the metrics from the metric store. 104 g := prometheus.Gatherers{ 105 prometheus.DefaultGatherer, 106 prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return ms.GetMetricFamilies(), nil }), 107 } 108 109 r := route.New() 110 r.Get(*routePrefix+"/-/healthy", handler.Healthy(ms).ServeHTTP) 111 r.Get(*routePrefix+"/-/ready", handler.Ready(ms).ServeHTTP) 112 r.Get( 113 path.Join(*routePrefix, *metricsPath), 114 promhttp.HandlerFor(g, promhttp.HandlerOpts{ 115 ErrorLog: logFunc(level.Error(logger).Log), 116 }).ServeHTTP, 117 ) 118 119 // Handlers for pushing and deleting metrics. 120 pushAPIPath := *routePrefix + "/metrics" 121 for _, suffix := range []string{"", handler.Base64Suffix} { 122 jobBase64Encoded := suffix == handler.Base64Suffix 123 r.Put(pushAPIPath+"/job"+suffix+"/:job/*labels", handler.Push(ms, true, !*pushUnchecked, jobBase64Encoded, logger)) 124 r.Post(pushAPIPath+"/job"+suffix+"/:job/*labels", handler.Push(ms, false, !*pushUnchecked, jobBase64Encoded, logger)) 125 r.Del(pushAPIPath+"/job"+suffix+"/:job/*labels", handler.Delete(ms, jobBase64Encoded, logger)) 126 r.Put(pushAPIPath+"/job"+suffix+"/:job", handler.Push(ms, true, !*pushUnchecked, jobBase64Encoded, logger)) 127 r.Post(pushAPIPath+"/job"+suffix+"/:job", handler.Push(ms, false, !*pushUnchecked, jobBase64Encoded, logger)) 128 r.Del(pushAPIPath+"/job"+suffix+"/:job", handler.Delete(ms, jobBase64Encoded, logger)) 129 } 130 r.Get(*routePrefix+"/static/*filepath", handler.Static(asset.Assets, *routePrefix).ServeHTTP) 131 132 statusHandler := handler.Status(ms, asset.Assets, flags, externalPathPrefix, logger) 133 r.Get(*routePrefix+"/status", statusHandler.ServeHTTP) 134 r.Get(*routePrefix+"/", statusHandler.ServeHTTP) 135 136 // Re-enable pprof. 137 r.Get(*routePrefix+"/debug/pprof/*pprof", handlePprof) 138 139 level.Info(logger).Log("listen_address", *listenAddress) 140 l, err := net.Listen("tcp", *listenAddress) 141 if err != nil { 142 level.Error(logger).Log("err", err) 143 os.Exit(1) 144 } 145 146 quitCh := make(chan struct{}) 147 quitHandler := func(w http.ResponseWriter, r *http.Request) { 148 fmt.Fprintf(w, "Requesting termination... Goodbye!") 149 close(quitCh) 150 } 151 152 forbiddenAPINotEnabled := func(w http.ResponseWriter, _ *http.Request) { 153 w.WriteHeader(http.StatusForbidden) 154 w.Write([]byte("Lifecycle API is not enabled.")) 155 } 156 157 if *enableLifeCycle { 158 r.Put(*routePrefix+"/-/quit", quitHandler) 159 r.Post(*routePrefix+"/-/quit", quitHandler) 160 } else { 161 r.Put(*routePrefix+"/-/quit", forbiddenAPINotEnabled) 162 r.Post(*routePrefix+"/-/quit", forbiddenAPINotEnabled) 163 } 164 165 r.Get("/-/quit", func(w http.ResponseWriter, _ *http.Request) { 166 w.WriteHeader(http.StatusMethodNotAllowed) 167 w.Write([]byte("Only POST or PUT requests allowed.")) 168 }) 169 170 mux := http.NewServeMux() 171 mux.Handle("/", r) 172 173 buildInfo := map[string]string{ 174 "version": version.Version, 175 "revision": version.Revision, 176 "branch": version.Branch, 177 "buildUser": version.BuildUser, 178 "buildDate": version.BuildDate, 179 "goVersion": version.GoVersion, 180 } 181 182 apiv1 := api_v1.New(logger, ms, flags, buildInfo) 183 184 apiPath := "/api" 185 if *routePrefix != "/" { 186 apiPath = *routePrefix + apiPath 187 } 188 189 av1 := route.New() 190 apiv1.Register(av1) 191 if *enableAdminAPI { 192 av1.Put("/admin/wipe", handler.WipeMetricStore(ms, logger).ServeHTTP) 193 } 194 195 mux.Handle(apiPath+"/v1/", http.StripPrefix(apiPath+"/v1", av1)) 196 197 server := &http.Server{ 198 Addr: *listenAddress, 199 Handler: mux, 200 } 201 202 go shutdownServerOnQuit(server, quitCh, logger) 203 err = web.Serve(l, server, *webConfig, logger) 204 205 // In the case of a graceful shutdown, do not log the error. 206 if err == http.ErrServerClosed { 207 level.Info(logger).Log("msg", "HTTP server stopped") 208 } else { 209 level.Error(logger).Log("msg", "HTTP server stopped", "err", err) 210 } 211 212 if err := ms.Shutdown(); err != nil { 213 level.Error(logger).Log("msg", "problem shutting down metric storage", "err", err) 214 } 215} 216 217func handlePprof(w http.ResponseWriter, r *http.Request) { 218 switch route.Param(r.Context(), "pprof") { 219 case "/cmdline": 220 pprof.Cmdline(w, r) 221 case "/profile": 222 pprof.Profile(w, r) 223 case "/symbol": 224 pprof.Symbol(w, r) 225 default: 226 pprof.Index(w, r) 227 } 228} 229 230// computeRoutePrefix returns the effective route prefix based on the 231// provided flag values for --web.route-prefix and 232// --web.external-url. With prefix empty, the path of externalURL is 233// used instead. A prefix "/" results in an empty returned prefix. Any 234// non-empty prefix is normalized to start, but not to end, with "/". 235func computeRoutePrefix(prefix string, externalURL *url.URL) string { 236 if prefix == "" { 237 prefix = externalURL.Path 238 } 239 240 if prefix == "/" { 241 prefix = "" 242 } 243 244 if prefix != "" { 245 prefix = "/" + strings.Trim(prefix, "/") 246 } 247 248 return prefix 249} 250 251// shutdownServerOnQuit shutdowns the provided server upon closing the provided 252// quitCh or upon receiving a SIGINT or SIGTERM. 253func shutdownServerOnQuit(server *http.Server, quitCh <-chan struct{}, logger log.Logger) error { 254 notifier := make(chan os.Signal, 1) 255 signal.Notify(notifier, os.Interrupt, syscall.SIGTERM) 256 257 select { 258 case <-notifier: 259 level.Info(logger).Log("msg", "received SIGINT/SIGTERM; exiting gracefully...") 260 break 261 case <-quitCh: 262 level.Warn(logger).Log("msg", "received termination request via web service, exiting gracefully...") 263 break 264 } 265 return server.Shutdown(context.Background()) 266} 267