1package tsa
2
3import (
4	"bytes"
5	"context"
6	"encoding/json"
7	"net/http"
8	"net/url"
9	"strings"
10	"time"
11
12	"code.cloudfoundry.org/clock"
13	"code.cloudfoundry.org/lager"
14	"code.cloudfoundry.org/lager/lagerctx"
15	"github.com/concourse/baggageclaim"
16	"github.com/concourse/concourse/atc"
17	"github.com/concourse/concourse/atc/worker/gclient"
18	"github.com/tedsuo/rata"
19)
20
21//go:generate counterfeiter . EndpointPicker
22type EndpointPicker interface {
23	Pick() *rata.RequestGenerator
24}
25
26type Heartbeater struct {
27	clock       clock.Clock
28	interval    time.Duration
29	cprInterval time.Duration
30
31	gardenClient       gclient.Client
32	baggageclaimClient baggageclaim.Client
33
34	atcEndpointPicker EndpointPicker
35	httpClient        *http.Client
36
37	registration atc.Worker
38	eventWriter  EventWriter
39}
40
41func NewHeartbeater(
42	clock clock.Clock,
43	interval time.Duration,
44	cprInterval time.Duration,
45	gardenClient gclient.Client,
46	baggageclaimClient baggageclaim.Client,
47	atcEndpointPicker EndpointPicker,
48	httpClient *http.Client,
49	worker atc.Worker,
50	eventWriter EventWriter,
51) *Heartbeater {
52	return &Heartbeater{
53		clock:       clock,
54		interval:    interval,
55		cprInterval: cprInterval,
56
57		gardenClient:       gardenClient,
58		baggageclaimClient: baggageclaimClient,
59
60		atcEndpointPicker: atcEndpointPicker,
61		httpClient:        httpClient,
62
63		registration: worker,
64		eventWriter:  eventWriter,
65	}
66}
67
68func (heartbeater *Heartbeater) Heartbeat(ctx context.Context) error {
69	logger := lagerctx.FromContext(ctx)
70
71	logger.Info("start")
72	defer logger.Info("done")
73
74	for !heartbeater.register(logger.Session("register")) {
75		select {
76		case <-heartbeater.clock.NewTimer(time.Second).C():
77		case <-ctx.Done():
78			return nil
79		}
80	}
81
82	currentInterval := heartbeater.interval
83
84	for {
85		select {
86		case <-ctx.Done():
87			return nil
88
89		case <-heartbeater.clock.NewTimer(currentInterval).C():
90			status := heartbeater.heartbeat(logger.Session("heartbeat"))
91			switch status {
92			case HeartbeatStatusGoneAway:
93				return nil
94			case HeartbeatStatusLanded:
95				return nil
96			case HeartbeatStatusHealthy:
97				currentInterval = heartbeater.interval
98			default:
99				currentInterval = heartbeater.cprInterval
100			}
101		}
102	}
103
104}
105
106func (heartbeater *Heartbeater) register(logger lager.Logger) bool {
107	heartbeatData := lager.Data{
108		"worker-platform": heartbeater.registration.Platform,
109		"worker-address":  heartbeater.registration.GardenAddr,
110		"worker-tags":     strings.Join(heartbeater.registration.Tags, ","),
111	}
112
113	logger.Info("start", heartbeatData)
114	defer logger.Info("done", heartbeatData)
115
116	registration, ok := heartbeater.pingWorker(logger)
117	if !ok {
118		return false
119	}
120
121	payload, err := json.Marshal(registration)
122	if err != nil {
123		logger.Error("failed-to-marshal-registration", err)
124		return false
125	}
126
127	request, err := heartbeater.atcEndpointPicker.Pick().CreateRequest(atc.RegisterWorker, nil, bytes.NewBuffer(payload))
128	if err != nil {
129		logger.Error("failed-to-construct-request", err)
130		return false
131	}
132
133	request.URL.RawQuery = url.Values{
134		"ttl": []string{heartbeater.ttl().String()},
135	}.Encode()
136
137	response, err := heartbeater.httpClient.Do(request)
138	if err != nil {
139		logger.Error("failed-to-register", err)
140		return false
141	}
142
143	defer response.Body.Close()
144
145	if response.StatusCode != http.StatusOK {
146		logger.Error("bad-response", nil, lager.Data{
147			"status-code": response.StatusCode,
148		})
149
150		return false
151	}
152
153	err = heartbeater.eventWriter.Registered()
154	if err != nil {
155		logger.Error("failed-to-emit-registered-event", err)
156		return true
157	}
158
159	return true
160}
161
162type HeartbeatStatus int
163
164const (
165	HeartbeatStatusUnhealthy = iota
166	HeartbeatStatusLanded
167	HeartbeatStatusGoneAway
168	HeartbeatStatusHealthy
169)
170
171func (heartbeater *Heartbeater) heartbeat(logger lager.Logger) HeartbeatStatus {
172	heartbeatData := lager.Data{
173		"worker-platform": heartbeater.registration.Platform,
174		"worker-address":  heartbeater.registration.GardenAddr,
175		"worker-tags":     strings.Join(heartbeater.registration.Tags, ","),
176	}
177
178	logger.Info("start", heartbeatData)
179	defer logger.Info("done", heartbeatData)
180
181	registration, ok := heartbeater.pingWorker(logger)
182	if !ok {
183		return HeartbeatStatusUnhealthy
184	}
185
186	payload, err := json.Marshal(registration)
187	if err != nil {
188		logger.Error("failed-to-marshal-registration", err)
189		return HeartbeatStatusUnhealthy
190	}
191
192	request, err := heartbeater.atcEndpointPicker.Pick().CreateRequest(atc.HeartbeatWorker, rata.Params{
193		"worker_name": heartbeater.registration.Name,
194	}, bytes.NewBuffer(payload))
195	if err != nil {
196		logger.Error("failed-to-construct-request", err)
197		return HeartbeatStatusUnhealthy
198	}
199
200	request.URL.RawQuery = url.Values{
201		"ttl": []string{heartbeater.ttl().String()},
202	}.Encode()
203
204	response, err := heartbeater.httpClient.Do(request)
205	if err != nil {
206		logger.Error("failed-to-heartbeat", err)
207		return HeartbeatStatusUnhealthy
208	}
209
210	defer response.Body.Close()
211
212	if response.StatusCode == http.StatusNotFound {
213		logger.Debug("worker-has-gone-away")
214		return HeartbeatStatusGoneAway
215	}
216
217	if response.StatusCode != http.StatusOK {
218		logger.Error("bad-response", nil, lager.Data{
219			"status-code": response.StatusCode,
220		})
221
222		return HeartbeatStatusUnhealthy
223	}
224
225	err = heartbeater.eventWriter.Heartbeated()
226	if err != nil {
227		logger.Error("failed-to-emit-heartbeated-event", err)
228	}
229
230	var workerInfo atc.Worker
231	err = json.NewDecoder(response.Body).Decode(&workerInfo)
232	if err != nil {
233		logger.Error("failed-to-decode-response", err)
234		return HeartbeatStatusUnhealthy
235	}
236
237	if workerInfo.State == "landed" {
238		logger.Debug("worker-landed")
239		return HeartbeatStatusLanded
240	}
241
242	return HeartbeatStatusHealthy
243}
244
245func (heartbeater *Heartbeater) pingWorker(logger lager.Logger) (atc.Worker, bool) {
246	registration := heartbeater.registration
247
248	beforeGarden := time.Now()
249
250	healthy := true
251
252	containers, err := heartbeater.gardenClient.Containers(nil)
253	if err != nil {
254		logger.Error("failed-to-fetch-containers", err)
255		healthy = false
256	}
257
258	afterGarden := time.Now()
259
260	beforeBaggageclaim := time.Now()
261
262	volumes, err := heartbeater.baggageclaimClient.ListVolumes(logger.Session("list-volumes"), nil)
263	if err != nil {
264		logger.Error("failed-to-list-volumes", err)
265		healthy = false
266	}
267
268	afterBaggageclaim := time.Now()
269
270	durationData := lager.Data{
271		"garden-took":       afterGarden.Sub(beforeGarden).String(),
272		"baggageclaim-took": afterBaggageclaim.Sub(beforeBaggageclaim).String(),
273	}
274
275	if healthy {
276		logger.Debug("reached-worker", durationData)
277	} else {
278		logger.Info("failed-to-reach-worker", durationData)
279		return atc.Worker{}, false
280	}
281
282	registration.ActiveContainers = len(containers)
283	registration.ActiveVolumes = len(volumes)
284
285	return registration, true
286}
287
288func (heartbeater *Heartbeater) ttl() time.Duration {
289	return heartbeater.interval * 2
290}
291