1// Copyright (C) 2018 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). 2 3package main 4 5import ( 6 "encoding/json" 7 "net" 8 "net/http" 9 "os" 10 "time" 11 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/syncthing/syncthing/lib/sync" 14) 15 16func init() { 17 processCollectorOpts := prometheus.ProcessCollectorOpts{ 18 Namespace: "syncthing_relaypoolsrv", 19 PidFn: func() (int, error) { 20 return os.Getpid(), nil 21 }, 22 } 23 24 prometheus.MustRegister( 25 prometheus.NewProcessCollector(processCollectorOpts), 26 ) 27} 28 29var ( 30 statusClient = http.Client{ 31 Timeout: 5 * time.Second, 32 } 33 34 apiRequestsTotal = makeCounter("api_requests_total", "Number of API requests.", "type", "result") 35 apiRequestsSeconds = makeSummary("api_requests_seconds", "Latency of API requests.", "type") 36 37 relayTestsTotal = makeCounter("tests_total", "Number of relay tests.", "result") 38 relayTestActionsSeconds = makeSummary("test_actions_seconds", "Latency of relay test actions.", "type") 39 40 locationLookupSeconds = makeSummary("location_lookup_seconds", "Latency of location lookups.").WithLabelValues() 41 42 metricsRequestsSeconds = makeSummary("metrics_requests_seconds", "Latency of metric requests.").WithLabelValues() 43 scrapeSeconds = makeSummary("relay_scrape_seconds", "Latency of metric scrapes from remote relays.", "result") 44 45 relayUptime = makeGauge("relay_uptime", "Uptime of relay", "relay") 46 relayPendingSessionKeys = makeGauge("relay_pending_session_keys", "Number of pending session keys (two keys per session, one per each side of the connection)", "relay") 47 relayActiveSessions = makeGauge("relay_active_sessions", "Number of sessions that are happening, a session contains two parties", "relay") 48 relayConnections = makeGauge("relay_connections", "Number of devices connected to the relay", "relay") 49 relayProxies = makeGauge("relay_proxies", "Number of active proxy routines sending data between peers (two proxies per session, one for each way)", "relay") 50 relayBytesProxied = makeGauge("relay_bytes_proxied", "Number of bytes proxied by the relay", "relay") 51 relayGoRoutines = makeGauge("relay_go_routines", "Number of Go routines in the process", "relay") 52 relaySessionRate = makeGauge("relay_session_rate", "Rate applied per session", "relay") 53 relayGlobalRate = makeGauge("relay_global_rate", "Global rate applied on the whole relay", "relay") 54 relayBuildInfo = makeGauge("relay_build_info", "Build information about a relay", "relay", "go_version", "go_os", "go_arch") 55 relayLocationInfo = makeGauge("relay_location_info", "Location information about a relay", "relay", "city", "country", "continent") 56 57 lastStats = make(map[string]stats) 58) 59 60func makeGauge(name string, help string, labels ...string) *prometheus.GaugeVec { 61 gauge := prometheus.NewGaugeVec( 62 prometheus.GaugeOpts{ 63 Namespace: "syncthing", 64 Subsystem: "relaypoolsrv", 65 Name: name, 66 Help: help, 67 }, 68 labels, 69 ) 70 prometheus.MustRegister(gauge) 71 return gauge 72} 73 74func makeSummary(name string, help string, labels ...string) *prometheus.SummaryVec { 75 summary := prometheus.NewSummaryVec( 76 prometheus.SummaryOpts{ 77 Namespace: "syncthing", 78 Subsystem: "relaypoolsrv", 79 Name: name, 80 Help: help, 81 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, 82 }, 83 labels, 84 ) 85 prometheus.MustRegister(summary) 86 return summary 87} 88 89func makeCounter(name string, help string, labels ...string) *prometheus.CounterVec { 90 counter := prometheus.NewCounterVec( 91 prometheus.CounterOpts{ 92 Namespace: "syncthing", 93 Subsystem: "relaypoolsrv", 94 Name: name, 95 Help: help, 96 }, 97 labels, 98 ) 99 prometheus.MustRegister(counter) 100 return counter 101} 102 103func statsRefresher(interval time.Duration) { 104 ticker := time.NewTicker(interval) 105 for range ticker.C { 106 refreshStats() 107 } 108} 109 110type statsFetchResult struct { 111 relay *relay 112 stats *stats 113} 114 115func refreshStats() { 116 mut.RLock() 117 relays := append(permanentRelays, knownRelays...) 118 mut.RUnlock() 119 120 now := time.Now() 121 wg := sync.NewWaitGroup() 122 123 results := make(chan statsFetchResult, len(relays)) 124 for _, rel := range relays { 125 wg.Add(1) 126 go func(rel *relay) { 127 t0 := time.Now() 128 stats := fetchStats(rel) 129 duration := time.Since(t0).Seconds() 130 result := "success" 131 if stats == nil { 132 result = "failed" 133 } 134 scrapeSeconds.WithLabelValues(result).Observe(duration) 135 136 results <- statsFetchResult{ 137 relay: rel, 138 stats: fetchStats(rel), 139 } 140 wg.Done() 141 }(rel) 142 } 143 144 wg.Wait() 145 close(results) 146 147 mut.Lock() 148 relayBuildInfo.Reset() 149 relayLocationInfo.Reset() 150 for result := range results { 151 result.relay.StatsRetrieved = now 152 result.relay.Stats = result.stats 153 if result.stats == nil { 154 deleteMetrics(result.relay.uri.Host) 155 } else { 156 updateMetrics(result.relay.uri.Host, *result.stats, result.relay.Location) 157 } 158 } 159 mut.Unlock() 160} 161 162func fetchStats(relay *relay) *stats { 163 statusAddr := relay.uri.Query().Get("statusAddr") 164 if statusAddr == "" { 165 statusAddr = ":22070" 166 } 167 168 statusHost, statusPort, err := net.SplitHostPort(statusAddr) 169 if err != nil { 170 return nil 171 } 172 173 if statusHost == "" { 174 if host, _, err := net.SplitHostPort(relay.uri.Host); err != nil { 175 return nil 176 } else { 177 statusHost = host 178 } 179 } 180 181 url := "http://" + net.JoinHostPort(statusHost, statusPort) + "/status" 182 183 response, err := statusClient.Get(url) 184 if err != nil { 185 return nil 186 } 187 188 var stats stats 189 190 if json.NewDecoder(response.Body).Decode(&stats); err != nil { 191 return nil 192 } 193 return &stats 194} 195 196func updateMetrics(host string, stats stats, location location) { 197 if stats.GoVersion != "" || stats.GoOS != "" || stats.GoArch != "" { 198 relayBuildInfo.WithLabelValues(host, stats.GoVersion, stats.GoOS, stats.GoArch).Add(1) 199 } 200 if location.City != "" || location.Country != "" || location.Continent != "" { 201 relayLocationInfo.WithLabelValues(host, location.City, location.Country, location.Continent).Add(1) 202 } 203 204 if lastStat, ok := lastStats[host]; ok { 205 stats = mergeStats(stats, lastStat) 206 } 207 208 relayUptime.WithLabelValues(host).Set(float64(stats.UptimeSeconds)) 209 relayPendingSessionKeys.WithLabelValues(host).Set(float64(stats.PendingSessionKeys)) 210 relayActiveSessions.WithLabelValues(host).Set(float64(stats.ActiveSessions)) 211 relayConnections.WithLabelValues(host).Set(float64(stats.Connections)) 212 relayProxies.WithLabelValues(host).Set(float64(stats.Proxies)) 213 relayBytesProxied.WithLabelValues(host).Set(float64(stats.BytesProxied)) 214 relayGoRoutines.WithLabelValues(host).Set(float64(stats.GoRoutines)) 215 relaySessionRate.WithLabelValues(host).Set(float64(stats.Options.SessionRate)) 216 relayGlobalRate.WithLabelValues(host).Set(float64(stats.Options.GlobalRate)) 217 lastStats[host] = stats 218} 219 220func deleteMetrics(host string) { 221 relayUptime.DeleteLabelValues(host) 222 relayPendingSessionKeys.DeleteLabelValues(host) 223 relayActiveSessions.DeleteLabelValues(host) 224 relayConnections.DeleteLabelValues(host) 225 relayProxies.DeleteLabelValues(host) 226 relayBytesProxied.DeleteLabelValues(host) 227 relayGoRoutines.DeleteLabelValues(host) 228 relaySessionRate.DeleteLabelValues(host) 229 relayGlobalRate.DeleteLabelValues(host) 230 delete(lastStats, host) 231} 232 233// Due to some unexplainable behaviour, some of the numbers sometimes travel slightly backwards (by less than 1%) 234// This happens between scrapes, which is 30s, so this can't be a race. 235// This causes prometheus to assume a "rate reset", hence causes phenomenal spikes. 236// One of the number that moves backwards is BytesProxied, which atomically increments a counter with numeric value 237// returned by net.Conn.Read(). I don't think that can return a negative value, so I have no idea what's going on. 238func mergeStats(new stats, old stats) stats { 239 new.UptimeSeconds = mergeValue(new.UptimeSeconds, old.UptimeSeconds) 240 new.PendingSessionKeys = mergeValue(new.PendingSessionKeys, old.PendingSessionKeys) 241 new.ActiveSessions = mergeValue(new.ActiveSessions, old.ActiveSessions) 242 new.Connections = mergeValue(new.Connections, old.Connections) 243 new.Proxies = mergeValue(new.Proxies, old.Proxies) 244 new.BytesProxied = mergeValue(new.BytesProxied, old.BytesProxied) 245 new.GoRoutines = mergeValue(new.GoRoutines, old.GoRoutines) 246 new.Options.SessionRate = mergeValue(new.Options.SessionRate, old.Options.SessionRate) 247 new.Options.GlobalRate = mergeValue(new.Options.GlobalRate, old.Options.GlobalRate) 248 return new 249} 250 251func mergeValue(new, old int) int { 252 if new >= old { 253 return new // normal increase 254 } 255 if float64(new) > 0.99*float64(old) { 256 return old // slight backward movement 257 } 258 return new // reset (relay restart) 259} 260