1package worker 2 3import ( 4 "github.com/concourse/concourse/atc/policy" 5 "net/http" 6 "time" 7 8 "code.cloudfoundry.org/clock" 9 "code.cloudfoundry.org/lager" 10 bclient "github.com/concourse/baggageclaim/client" 11 "github.com/concourse/concourse/atc/db" 12 "github.com/concourse/concourse/atc/db/lock" 13 "github.com/concourse/concourse/atc/worker/gclient" 14 "github.com/concourse/concourse/atc/worker/transport" 15 "github.com/concourse/retryhttp" 16 "github.com/cppforlife/go-semi-semantic/version" 17) 18 19type dbWorkerProvider struct { 20 lockFactory lock.LockFactory 21 retryBackOffFactory retryhttp.BackOffFactory 22 resourceFetcher Fetcher 23 imageFactory ImageFactory 24 dbResourceCacheFactory db.ResourceCacheFactory 25 dbResourceConfigFactory db.ResourceConfigFactory 26 dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory 27 dbTaskCacheFactory db.TaskCacheFactory 28 dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory 29 dbVolumeRepository db.VolumeRepository 30 dbTeamFactory db.TeamFactory 31 dbWorkerFactory db.WorkerFactory 32 workerVersion version.Version 33 baggageclaimResponseHeaderTimeout time.Duration 34 gardenRequestTimeout time.Duration 35 policyChecker *policy.Checker 36} 37 38func NewDBWorkerProvider( 39 lockFactory lock.LockFactory, 40 retryBackOffFactory retryhttp.BackOffFactory, 41 fetcher Fetcher, 42 imageFactory ImageFactory, 43 dbResourceCacheFactory db.ResourceCacheFactory, 44 dbResourceConfigFactory db.ResourceConfigFactory, 45 dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory, 46 dbTaskCacheFactory db.TaskCacheFactory, 47 dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory, 48 dbVolumeRepository db.VolumeRepository, 49 dbTeamFactory db.TeamFactory, 50 workerFactory db.WorkerFactory, 51 workerVersion version.Version, 52 baggageclaimResponseHeaderTimeout, gardenRequestTimeout time.Duration, 53 policyChecker *policy.Checker, 54) WorkerProvider { 55 return &dbWorkerProvider{ 56 lockFactory: lockFactory, 57 retryBackOffFactory: retryBackOffFactory, 58 resourceFetcher: fetcher, 59 imageFactory: imageFactory, 60 dbResourceCacheFactory: dbResourceCacheFactory, 61 dbResourceConfigFactory: dbResourceConfigFactory, 62 dbWorkerBaseResourceTypeFactory: dbWorkerBaseResourceTypeFactory, 63 dbTaskCacheFactory: dbTaskCacheFactory, 64 dbWorkerTaskCacheFactory: dbWorkerTaskCacheFactory, 65 dbVolumeRepository: dbVolumeRepository, 66 dbTeamFactory: dbTeamFactory, 67 dbWorkerFactory: workerFactory, 68 workerVersion: workerVersion, 69 baggageclaimResponseHeaderTimeout: baggageclaimResponseHeaderTimeout, 70 gardenRequestTimeout: gardenRequestTimeout, 71 policyChecker: policyChecker, 72 } 73} 74 75func (provider *dbWorkerProvider) RunningWorkers(logger lager.Logger) ([]Worker, error) { 76 savedWorkers, err := provider.dbWorkerFactory.Workers() 77 if err != nil { 78 return nil, err 79 } 80 81 buildContainersCountPerWorker, err := provider.dbWorkerFactory.BuildContainersCountPerWorker() 82 if err != nil { 83 return nil, err 84 } 85 86 workers := []Worker{} 87 88 for _, savedWorker := range savedWorkers { 89 if savedWorker.State() != db.WorkerStateRunning { 90 continue 91 } 92 93 workerLog := logger.Session("running-worker") 94 worker := provider.NewGardenWorker( 95 workerLog, 96 savedWorker, 97 buildContainersCountPerWorker[savedWorker.Name()], 98 ) 99 if !worker.IsVersionCompatible(workerLog, provider.workerVersion) { 100 continue 101 } 102 103 workers = append(workers, worker) 104 } 105 106 return workers, nil 107} 108 109func (provider *dbWorkerProvider) FindWorkersForContainerByOwner( 110 logger lager.Logger, 111 owner db.ContainerOwner, 112) ([]Worker, error) { 113 logger = logger.Session("worker-for-container") 114 dbWorkers, err := provider.dbWorkerFactory.FindWorkersForContainerByOwner(owner) 115 if err != nil { 116 return nil, err 117 } 118 119 var workers []Worker 120 for _, w := range dbWorkers { 121 worker := provider.NewGardenWorker(logger, w, 0) 122 if worker.IsVersionCompatible(logger, provider.workerVersion) { 123 workers = append(workers, worker) 124 } 125 } 126 127 return workers, nil 128} 129 130func (provider *dbWorkerProvider) FindWorkerForContainer( 131 logger lager.Logger, 132 teamID int, 133 handle string, 134) (Worker, bool, error) { 135 logger = logger.Session("worker-for-container") 136 team := provider.dbTeamFactory.GetByID(teamID) 137 138 dbWorker, found, err := team.FindWorkerForContainer(handle) 139 if err != nil { 140 return nil, false, err 141 } 142 143 if !found { 144 return nil, false, nil 145 } 146 147 worker := provider.NewGardenWorker(logger, dbWorker, 0) 148 if !worker.IsVersionCompatible(logger, provider.workerVersion) { 149 return nil, false, nil 150 } 151 return worker, true, err 152} 153 154func (provider *dbWorkerProvider) FindWorkerForVolume( 155 logger lager.Logger, 156 teamID int, 157 handle string, 158) (Worker, bool, error) { 159 logger = logger.Session("worker-for-volume") 160 team := provider.dbTeamFactory.GetByID(teamID) 161 162 dbWorker, found, err := team.FindWorkerForVolume(handle) 163 if err != nil { 164 return nil, false, err 165 } 166 167 if !found { 168 return nil, false, nil 169 } 170 171 worker := provider.NewGardenWorker(logger, dbWorker, 0) 172 if !worker.IsVersionCompatible(logger, provider.workerVersion) { 173 return nil, false, nil 174 } 175 return worker, true, err 176} 177 178func (provider *dbWorkerProvider) NewGardenWorker(logger lager.Logger, savedWorker db.Worker, buildContainersCount int) Worker { 179 gcf := gclient.NewGardenClientFactory( 180 provider.dbWorkerFactory, 181 logger.Session("garden-connection"), 182 savedWorker.Name(), 183 savedWorker.GardenAddr(), 184 provider.retryBackOffFactory, 185 provider.gardenRequestTimeout, 186 ) 187 188 gClient := gcf.NewClient() 189 190 bClient := bclient.New("", transport.NewBaggageclaimRoundTripper( 191 savedWorker.Name(), 192 savedWorker.BaggageclaimURL(), 193 provider.dbWorkerFactory, 194 &http.Transport{ 195 DisableKeepAlives: true, 196 ResponseHeaderTimeout: provider.baggageclaimResponseHeaderTimeout, 197 }, 198 )) 199 200 volumeClient := NewVolumeClient( 201 bClient, 202 savedWorker, 203 clock.NewClock(), 204 provider.lockFactory, 205 provider.dbVolumeRepository, 206 provider.dbWorkerBaseResourceTypeFactory, 207 provider.dbTaskCacheFactory, 208 provider.dbWorkerTaskCacheFactory, 209 ) 210 211 return NewGardenWorker( 212 gClient, 213 provider.dbVolumeRepository, 214 volumeClient, 215 provider.imageFactory, 216 provider.resourceFetcher, 217 provider.dbTeamFactory, 218 savedWorker, 219 provider.dbResourceCacheFactory, 220 buildContainersCount, 221 provider.policyChecker, 222 ) 223} 224