1// Copyright (C) 2018 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
2
3package main
4
5import (
6	"encoding/json"
7	"net"
8	"net/http"
9	"os"
10	"time"
11
12	"github.com/prometheus/client_golang/prometheus"
13	"github.com/syncthing/syncthing/lib/sync"
14)
15
16func init() {
17	processCollectorOpts := prometheus.ProcessCollectorOpts{
18		Namespace: "syncthing_relaypoolsrv",
19		PidFn: func() (int, error) {
20			return os.Getpid(), nil
21		},
22	}
23
24	prometheus.MustRegister(
25		prometheus.NewProcessCollector(processCollectorOpts),
26	)
27}
28
29var (
30	statusClient = http.Client{
31		Timeout: 5 * time.Second,
32	}
33
34	apiRequestsTotal   = makeCounter("api_requests_total", "Number of API requests.", "type", "result")
35	apiRequestsSeconds = makeSummary("api_requests_seconds", "Latency of API requests.", "type")
36
37	relayTestsTotal         = makeCounter("tests_total", "Number of relay tests.", "result")
38	relayTestActionsSeconds = makeSummary("test_actions_seconds", "Latency of relay test actions.", "type")
39
40	locationLookupSeconds = makeSummary("location_lookup_seconds", "Latency of location lookups.").WithLabelValues()
41
42	metricsRequestsSeconds = makeSummary("metrics_requests_seconds", "Latency of metric requests.").WithLabelValues()
43	scrapeSeconds          = makeSummary("relay_scrape_seconds", "Latency of metric scrapes from remote relays.", "result")
44
45	relayUptime             = makeGauge("relay_uptime", "Uptime of relay", "relay")
46	relayPendingSessionKeys = makeGauge("relay_pending_session_keys", "Number of pending session keys (two keys per session, one per each side of the connection)", "relay")
47	relayActiveSessions     = makeGauge("relay_active_sessions", "Number of sessions that are happening, a session contains two parties", "relay")
48	relayConnections        = makeGauge("relay_connections", "Number of devices connected to the relay", "relay")
49	relayProxies            = makeGauge("relay_proxies", "Number of active proxy routines sending data between peers (two proxies per session, one for each way)", "relay")
50	relayBytesProxied       = makeGauge("relay_bytes_proxied", "Number of bytes proxied by the relay", "relay")
51	relayGoRoutines         = makeGauge("relay_go_routines", "Number of Go routines in the process", "relay")
52	relaySessionRate        = makeGauge("relay_session_rate", "Rate applied per session", "relay")
53	relayGlobalRate         = makeGauge("relay_global_rate", "Global rate applied on the whole relay", "relay")
54	relayBuildInfo          = makeGauge("relay_build_info", "Build information about a relay", "relay", "go_version", "go_os", "go_arch")
55	relayLocationInfo       = makeGauge("relay_location_info", "Location information about a relay", "relay", "city", "country", "continent")
56
57	lastStats = make(map[string]stats)
58)
59
60func makeGauge(name string, help string, labels ...string) *prometheus.GaugeVec {
61	gauge := prometheus.NewGaugeVec(
62		prometheus.GaugeOpts{
63			Namespace: "syncthing",
64			Subsystem: "relaypoolsrv",
65			Name:      name,
66			Help:      help,
67		},
68		labels,
69	)
70	prometheus.MustRegister(gauge)
71	return gauge
72}
73
74func makeSummary(name string, help string, labels ...string) *prometheus.SummaryVec {
75	summary := prometheus.NewSummaryVec(
76		prometheus.SummaryOpts{
77			Namespace:  "syncthing",
78			Subsystem:  "relaypoolsrv",
79			Name:       name,
80			Help:       help,
81			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
82		},
83		labels,
84	)
85	prometheus.MustRegister(summary)
86	return summary
87}
88
89func makeCounter(name string, help string, labels ...string) *prometheus.CounterVec {
90	counter := prometheus.NewCounterVec(
91		prometheus.CounterOpts{
92			Namespace: "syncthing",
93			Subsystem: "relaypoolsrv",
94			Name:      name,
95			Help:      help,
96		},
97		labels,
98	)
99	prometheus.MustRegister(counter)
100	return counter
101}
102
103func statsRefresher(interval time.Duration) {
104	ticker := time.NewTicker(interval)
105	for range ticker.C {
106		refreshStats()
107	}
108}
109
110type statsFetchResult struct {
111	relay *relay
112	stats *stats
113}
114
115func refreshStats() {
116	mut.RLock()
117	relays := append(permanentRelays, knownRelays...)
118	mut.RUnlock()
119
120	now := time.Now()
121	wg := sync.NewWaitGroup()
122
123	results := make(chan statsFetchResult, len(relays))
124	for _, rel := range relays {
125		wg.Add(1)
126		go func(rel *relay) {
127			t0 := time.Now()
128			stats := fetchStats(rel)
129			duration := time.Since(t0).Seconds()
130			result := "success"
131			if stats == nil {
132				result = "failed"
133			}
134			scrapeSeconds.WithLabelValues(result).Observe(duration)
135
136			results <- statsFetchResult{
137				relay: rel,
138				stats: fetchStats(rel),
139			}
140			wg.Done()
141		}(rel)
142	}
143
144	wg.Wait()
145	close(results)
146
147	mut.Lock()
148	relayBuildInfo.Reset()
149	relayLocationInfo.Reset()
150	for result := range results {
151		result.relay.StatsRetrieved = now
152		result.relay.Stats = result.stats
153		if result.stats == nil {
154			deleteMetrics(result.relay.uri.Host)
155		} else {
156			updateMetrics(result.relay.uri.Host, *result.stats, result.relay.Location)
157		}
158	}
159	mut.Unlock()
160}
161
162func fetchStats(relay *relay) *stats {
163	statusAddr := relay.uri.Query().Get("statusAddr")
164	if statusAddr == "" {
165		statusAddr = ":22070"
166	}
167
168	statusHost, statusPort, err := net.SplitHostPort(statusAddr)
169	if err != nil {
170		return nil
171	}
172
173	if statusHost == "" {
174		if host, _, err := net.SplitHostPort(relay.uri.Host); err != nil {
175			return nil
176		} else {
177			statusHost = host
178		}
179	}
180
181	url := "http://" + net.JoinHostPort(statusHost, statusPort) + "/status"
182
183	response, err := statusClient.Get(url)
184	if err != nil {
185		return nil
186	}
187
188	var stats stats
189
190	if json.NewDecoder(response.Body).Decode(&stats); err != nil {
191		return nil
192	}
193	return &stats
194}
195
196func updateMetrics(host string, stats stats, location location) {
197	if stats.GoVersion != "" || stats.GoOS != "" || stats.GoArch != "" {
198		relayBuildInfo.WithLabelValues(host, stats.GoVersion, stats.GoOS, stats.GoArch).Add(1)
199	}
200	if location.City != "" || location.Country != "" || location.Continent != "" {
201		relayLocationInfo.WithLabelValues(host, location.City, location.Country, location.Continent).Add(1)
202	}
203
204	if lastStat, ok := lastStats[host]; ok {
205		stats = mergeStats(stats, lastStat)
206	}
207
208	relayUptime.WithLabelValues(host).Set(float64(stats.UptimeSeconds))
209	relayPendingSessionKeys.WithLabelValues(host).Set(float64(stats.PendingSessionKeys))
210	relayActiveSessions.WithLabelValues(host).Set(float64(stats.ActiveSessions))
211	relayConnections.WithLabelValues(host).Set(float64(stats.Connections))
212	relayProxies.WithLabelValues(host).Set(float64(stats.Proxies))
213	relayBytesProxied.WithLabelValues(host).Set(float64(stats.BytesProxied))
214	relayGoRoutines.WithLabelValues(host).Set(float64(stats.GoRoutines))
215	relaySessionRate.WithLabelValues(host).Set(float64(stats.Options.SessionRate))
216	relayGlobalRate.WithLabelValues(host).Set(float64(stats.Options.GlobalRate))
217	lastStats[host] = stats
218}
219
220func deleteMetrics(host string) {
221	relayUptime.DeleteLabelValues(host)
222	relayPendingSessionKeys.DeleteLabelValues(host)
223	relayActiveSessions.DeleteLabelValues(host)
224	relayConnections.DeleteLabelValues(host)
225	relayProxies.DeleteLabelValues(host)
226	relayBytesProxied.DeleteLabelValues(host)
227	relayGoRoutines.DeleteLabelValues(host)
228	relaySessionRate.DeleteLabelValues(host)
229	relayGlobalRate.DeleteLabelValues(host)
230	delete(lastStats, host)
231}
232
233// Due to some unexplainable behaviour, some of the numbers sometimes travel slightly backwards (by less than 1%)
234// This happens between scrapes, which is 30s, so this can't be a race.
235// This causes prometheus to assume a "rate reset", hence causes phenomenal spikes.
236// One of the number that moves backwards is BytesProxied, which atomically increments a counter with numeric value
237// returned by net.Conn.Read(). I don't think that can return a negative value, so I have no idea what's going on.
238func mergeStats(new stats, old stats) stats {
239	new.UptimeSeconds = mergeValue(new.UptimeSeconds, old.UptimeSeconds)
240	new.PendingSessionKeys = mergeValue(new.PendingSessionKeys, old.PendingSessionKeys)
241	new.ActiveSessions = mergeValue(new.ActiveSessions, old.ActiveSessions)
242	new.Connections = mergeValue(new.Connections, old.Connections)
243	new.Proxies = mergeValue(new.Proxies, old.Proxies)
244	new.BytesProxied = mergeValue(new.BytesProxied, old.BytesProxied)
245	new.GoRoutines = mergeValue(new.GoRoutines, old.GoRoutines)
246	new.Options.SessionRate = mergeValue(new.Options.SessionRate, old.Options.SessionRate)
247	new.Options.GlobalRate = mergeValue(new.Options.GlobalRate, old.Options.GlobalRate)
248	return new
249}
250
251func mergeValue(new, old int) int {
252	if new >= old {
253		return new // normal increase
254	}
255	if float64(new) > 0.99*float64(old) {
256		return old // slight backward movement
257	}
258	return new // reset (relay restart)
259}
260