1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4package runner 5 6// Based on the Prometheus remote storage example: 7// documentation/examples/remote_storage/remote_storage_adapter/main.go 8 9import ( 10 "fmt" 11 "net" 12 "net/http" 13 "time" 14 15 _ "github.com/jackc/pgx/v4/stdlib" 16 "github.com/thanos-io/thanos/pkg/store/storepb" 17 "github.com/timescale/promscale/pkg/api" 18 "github.com/timescale/promscale/pkg/log" 19 "github.com/timescale/promscale/pkg/thanos" 20 "github.com/timescale/promscale/pkg/util" 21 tput "github.com/timescale/promscale/pkg/util/throughput" 22 "github.com/timescale/promscale/pkg/version" 23 "google.golang.org/grpc" 24) 25 26const promLivenessCheck = time.Second 27 28var ( 29 elector *util.Elector 30 startupError = fmt.Errorf("startup error") 31) 32 33func Run(cfg *Config) error { 34 log.Info("msg", "Version:"+version.Promscale+"; Commit Hash: "+version.CommitHash) 35 36 redacted := *cfg 37 redacted.PgmodelCfg.Password = "****" 38 redacted.PgmodelCfg.DbUri = "****" 39 log.Info("config", fmt.Sprintf("%+v", redacted)) 40 41 if cfg.APICfg.ReadOnly { 42 log.Info("msg", "Migrations disabled for read-only mode") 43 } else { 44 tput.InitWatcher(cfg.ThroughputInterval) 45 } 46 47 promMetrics := api.InitMetrics() 48 client, err := CreateClient(cfg, promMetrics) 49 if err != nil { 50 log.Error("msg", "aborting startup due to error", "err", err.Error()) 51 return startupError 52 } 53 54 if client == nil { 55 return nil 56 } 57 58 defer client.Close() 59 60 router, err := api.GenerateRouter(&cfg.APICfg, client, elector) 61 if err != nil { 62 log.Error("msg", "aborting startup due to error", "err", fmt.Sprintf("generate router: %s", err.Error())) 63 return fmt.Errorf("generate router: %w", err) 64 } 65 66 log.Info("msg", "Starting up...") 67 log.Info("msg", "Listening", "addr", cfg.ListenAddr) 68 69 if len(cfg.ThanosStoreAPIListenAddr) > 0 { 70 srv := thanos.NewStorage(client.Queryable()) 71 grpcServer := grpc.NewServer() 72 storepb.RegisterStoreServer(grpcServer, srv) 73 74 go func() { 75 log.Info("msg", fmt.Sprintf("Start listening for Thanos StoreAPI on %s", cfg.ThanosStoreAPIListenAddr)) 76 listener, err := net.Listen("tcp", cfg.ThanosStoreAPIListenAddr) 77 if err != nil { 78 log.Error("msg", "Listening for Thanos StoreAPI failed", "err", err) 79 return 80 } 81 82 log.Info("msg", "Start thanos-store") 83 if err := grpcServer.Serve(listener); err != nil { 84 log.Error("msg", "Starting the Thanos store failed", "err", err) 85 return 86 } 87 }() 88 } 89 90 mux := http.NewServeMux() 91 mux.Handle("/", router) 92 93 if cfg.TLSCertFile != "" { 94 err = http.ListenAndServeTLS(cfg.ListenAddr, cfg.TLSCertFile, cfg.TLSKeyFile, mux) 95 } else { 96 err = http.ListenAndServe(cfg.ListenAddr, mux) 97 } 98 99 if err != nil { 100 log.Error("msg", "Listen failure", "err", err) 101 return startupError 102 } 103 104 return nil 105} 106