1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4package runner
5
6// Based on the Prometheus remote storage example:
7// documentation/examples/remote_storage/remote_storage_adapter/main.go
8
9import (
10	"fmt"
11	"net"
12	"net/http"
13	"time"
14
15	_ "github.com/jackc/pgx/v4/stdlib"
16	"github.com/thanos-io/thanos/pkg/store/storepb"
17	"github.com/timescale/promscale/pkg/api"
18	"github.com/timescale/promscale/pkg/log"
19	"github.com/timescale/promscale/pkg/thanos"
20	"github.com/timescale/promscale/pkg/util"
21	tput "github.com/timescale/promscale/pkg/util/throughput"
22	"github.com/timescale/promscale/pkg/version"
23	"google.golang.org/grpc"
24)
25
26const promLivenessCheck = time.Second
27
28var (
29	elector      *util.Elector
30	startupError = fmt.Errorf("startup error")
31)
32
33func Run(cfg *Config) error {
34	log.Info("msg", "Version:"+version.Promscale+"; Commit Hash: "+version.CommitHash)
35
36	redacted := *cfg
37	redacted.PgmodelCfg.Password = "****"
38	redacted.PgmodelCfg.DbUri = "****"
39	log.Info("config", fmt.Sprintf("%+v", redacted))
40
41	if cfg.APICfg.ReadOnly {
42		log.Info("msg", "Migrations disabled for read-only mode")
43	} else {
44		tput.InitWatcher(cfg.ThroughputInterval)
45	}
46
47	promMetrics := api.InitMetrics()
48	client, err := CreateClient(cfg, promMetrics)
49	if err != nil {
50		log.Error("msg", "aborting startup due to error", "err", err.Error())
51		return startupError
52	}
53
54	if client == nil {
55		return nil
56	}
57
58	defer client.Close()
59
60	router, err := api.GenerateRouter(&cfg.APICfg, client, elector)
61	if err != nil {
62		log.Error("msg", "aborting startup due to error", "err", fmt.Sprintf("generate router: %s", err.Error()))
63		return fmt.Errorf("generate router: %w", err)
64	}
65
66	log.Info("msg", "Starting up...")
67	log.Info("msg", "Listening", "addr", cfg.ListenAddr)
68
69	if len(cfg.ThanosStoreAPIListenAddr) > 0 {
70		srv := thanos.NewStorage(client.Queryable())
71		grpcServer := grpc.NewServer()
72		storepb.RegisterStoreServer(grpcServer, srv)
73
74		go func() {
75			log.Info("msg", fmt.Sprintf("Start listening for Thanos StoreAPI on %s", cfg.ThanosStoreAPIListenAddr))
76			listener, err := net.Listen("tcp", cfg.ThanosStoreAPIListenAddr)
77			if err != nil {
78				log.Error("msg", "Listening for Thanos StoreAPI failed", "err", err)
79				return
80			}
81
82			log.Info("msg", "Start thanos-store")
83			if err := grpcServer.Serve(listener); err != nil {
84				log.Error("msg", "Starting the Thanos store failed", "err", err)
85				return
86			}
87		}()
88	}
89
90	mux := http.NewServeMux()
91	mux.Handle("/", router)
92
93	if cfg.TLSCertFile != "" {
94		err = http.ListenAndServeTLS(cfg.ListenAddr, cfg.TLSCertFile, cfg.TLSKeyFile, mux)
95	} else {
96		err = http.ListenAndServe(cfg.ListenAddr, mux)
97	}
98
99	if err != nil {
100		log.Error("msg", "Listen failure", "err", err)
101		return startupError
102	}
103
104	return nil
105}
106