1package tsa 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "net/http" 8 "net/url" 9 "strings" 10 "time" 11 12 "code.cloudfoundry.org/clock" 13 "code.cloudfoundry.org/lager" 14 "code.cloudfoundry.org/lager/lagerctx" 15 "github.com/concourse/baggageclaim" 16 "github.com/concourse/concourse/atc" 17 "github.com/concourse/concourse/atc/worker/gclient" 18 "github.com/tedsuo/rata" 19) 20 21//go:generate counterfeiter . EndpointPicker 22type EndpointPicker interface { 23 Pick() *rata.RequestGenerator 24} 25 26type Heartbeater struct { 27 clock clock.Clock 28 interval time.Duration 29 cprInterval time.Duration 30 31 gardenClient gclient.Client 32 baggageclaimClient baggageclaim.Client 33 34 atcEndpointPicker EndpointPicker 35 httpClient *http.Client 36 37 registration atc.Worker 38 eventWriter EventWriter 39} 40 41func NewHeartbeater( 42 clock clock.Clock, 43 interval time.Duration, 44 cprInterval time.Duration, 45 gardenClient gclient.Client, 46 baggageclaimClient baggageclaim.Client, 47 atcEndpointPicker EndpointPicker, 48 httpClient *http.Client, 49 worker atc.Worker, 50 eventWriter EventWriter, 51) *Heartbeater { 52 return &Heartbeater{ 53 clock: clock, 54 interval: interval, 55 cprInterval: cprInterval, 56 57 gardenClient: gardenClient, 58 baggageclaimClient: baggageclaimClient, 59 60 atcEndpointPicker: atcEndpointPicker, 61 httpClient: httpClient, 62 63 registration: worker, 64 eventWriter: eventWriter, 65 } 66} 67 68func (heartbeater *Heartbeater) Heartbeat(ctx context.Context) error { 69 logger := lagerctx.FromContext(ctx) 70 71 logger.Info("start") 72 defer logger.Info("done") 73 74 for !heartbeater.register(logger.Session("register")) { 75 select { 76 case <-heartbeater.clock.NewTimer(time.Second).C(): 77 case <-ctx.Done(): 78 return nil 79 } 80 } 81 82 currentInterval := heartbeater.interval 83 84 for { 85 select { 86 case <-ctx.Done(): 87 return nil 88 89 case <-heartbeater.clock.NewTimer(currentInterval).C(): 90 status := heartbeater.heartbeat(logger.Session("heartbeat")) 91 switch status { 92 case HeartbeatStatusGoneAway: 93 return nil 94 case HeartbeatStatusLanded: 95 return nil 96 case HeartbeatStatusHealthy: 97 currentInterval = heartbeater.interval 98 default: 99 currentInterval = heartbeater.cprInterval 100 } 101 } 102 } 103 104} 105 106func (heartbeater *Heartbeater) register(logger lager.Logger) bool { 107 heartbeatData := lager.Data{ 108 "worker-platform": heartbeater.registration.Platform, 109 "worker-address": heartbeater.registration.GardenAddr, 110 "worker-tags": strings.Join(heartbeater.registration.Tags, ","), 111 } 112 113 logger.Info("start", heartbeatData) 114 defer logger.Info("done", heartbeatData) 115 116 registration, ok := heartbeater.pingWorker(logger) 117 if !ok { 118 return false 119 } 120 121 payload, err := json.Marshal(registration) 122 if err != nil { 123 logger.Error("failed-to-marshal-registration", err) 124 return false 125 } 126 127 request, err := heartbeater.atcEndpointPicker.Pick().CreateRequest(atc.RegisterWorker, nil, bytes.NewBuffer(payload)) 128 if err != nil { 129 logger.Error("failed-to-construct-request", err) 130 return false 131 } 132 133 request.URL.RawQuery = url.Values{ 134 "ttl": []string{heartbeater.ttl().String()}, 135 }.Encode() 136 137 response, err := heartbeater.httpClient.Do(request) 138 if err != nil { 139 logger.Error("failed-to-register", err) 140 return false 141 } 142 143 defer response.Body.Close() 144 145 if response.StatusCode != http.StatusOK { 146 logger.Error("bad-response", nil, lager.Data{ 147 "status-code": response.StatusCode, 148 }) 149 150 return false 151 } 152 153 err = heartbeater.eventWriter.Registered() 154 if err != nil { 155 logger.Error("failed-to-emit-registered-event", err) 156 return true 157 } 158 159 return true 160} 161 162type HeartbeatStatus int 163 164const ( 165 HeartbeatStatusUnhealthy = iota 166 HeartbeatStatusLanded 167 HeartbeatStatusGoneAway 168 HeartbeatStatusHealthy 169) 170 171func (heartbeater *Heartbeater) heartbeat(logger lager.Logger) HeartbeatStatus { 172 heartbeatData := lager.Data{ 173 "worker-platform": heartbeater.registration.Platform, 174 "worker-address": heartbeater.registration.GardenAddr, 175 "worker-tags": strings.Join(heartbeater.registration.Tags, ","), 176 } 177 178 logger.Info("start", heartbeatData) 179 defer logger.Info("done", heartbeatData) 180 181 registration, ok := heartbeater.pingWorker(logger) 182 if !ok { 183 return HeartbeatStatusUnhealthy 184 } 185 186 payload, err := json.Marshal(registration) 187 if err != nil { 188 logger.Error("failed-to-marshal-registration", err) 189 return HeartbeatStatusUnhealthy 190 } 191 192 request, err := heartbeater.atcEndpointPicker.Pick().CreateRequest(atc.HeartbeatWorker, rata.Params{ 193 "worker_name": heartbeater.registration.Name, 194 }, bytes.NewBuffer(payload)) 195 if err != nil { 196 logger.Error("failed-to-construct-request", err) 197 return HeartbeatStatusUnhealthy 198 } 199 200 request.URL.RawQuery = url.Values{ 201 "ttl": []string{heartbeater.ttl().String()}, 202 }.Encode() 203 204 response, err := heartbeater.httpClient.Do(request) 205 if err != nil { 206 logger.Error("failed-to-heartbeat", err) 207 return HeartbeatStatusUnhealthy 208 } 209 210 defer response.Body.Close() 211 212 if response.StatusCode == http.StatusNotFound { 213 logger.Debug("worker-has-gone-away") 214 return HeartbeatStatusGoneAway 215 } 216 217 if response.StatusCode != http.StatusOK { 218 logger.Error("bad-response", nil, lager.Data{ 219 "status-code": response.StatusCode, 220 }) 221 222 return HeartbeatStatusUnhealthy 223 } 224 225 err = heartbeater.eventWriter.Heartbeated() 226 if err != nil { 227 logger.Error("failed-to-emit-heartbeated-event", err) 228 } 229 230 var workerInfo atc.Worker 231 err = json.NewDecoder(response.Body).Decode(&workerInfo) 232 if err != nil { 233 logger.Error("failed-to-decode-response", err) 234 return HeartbeatStatusUnhealthy 235 } 236 237 if workerInfo.State == "landed" { 238 logger.Debug("worker-landed") 239 return HeartbeatStatusLanded 240 } 241 242 return HeartbeatStatusHealthy 243} 244 245func (heartbeater *Heartbeater) pingWorker(logger lager.Logger) (atc.Worker, bool) { 246 registration := heartbeater.registration 247 248 beforeGarden := time.Now() 249 250 healthy := true 251 252 containers, err := heartbeater.gardenClient.Containers(nil) 253 if err != nil { 254 logger.Error("failed-to-fetch-containers", err) 255 healthy = false 256 } 257 258 afterGarden := time.Now() 259 260 beforeBaggageclaim := time.Now() 261 262 volumes, err := heartbeater.baggageclaimClient.ListVolumes(logger.Session("list-volumes"), nil) 263 if err != nil { 264 logger.Error("failed-to-list-volumes", err) 265 healthy = false 266 } 267 268 afterBaggageclaim := time.Now() 269 270 durationData := lager.Data{ 271 "garden-took": afterGarden.Sub(beforeGarden).String(), 272 "baggageclaim-took": afterBaggageclaim.Sub(beforeBaggageclaim).String(), 273 } 274 275 if healthy { 276 logger.Debug("reached-worker", durationData) 277 } else { 278 logger.Info("failed-to-reach-worker", durationData) 279 return atc.Worker{}, false 280 } 281 282 registration.ActiveContainers = len(containers) 283 registration.ActiveVolumes = len(volumes) 284 285 return registration, true 286} 287 288func (heartbeater *Heartbeater) ttl() time.Duration { 289 return heartbeater.interval * 2 290} 291