1package worker
2
3import (
4	"github.com/concourse/concourse/atc/policy"
5	"net/http"
6	"time"
7
8	"code.cloudfoundry.org/clock"
9	"code.cloudfoundry.org/lager"
10	bclient "github.com/concourse/baggageclaim/client"
11	"github.com/concourse/concourse/atc/db"
12	"github.com/concourse/concourse/atc/db/lock"
13	"github.com/concourse/concourse/atc/worker/gclient"
14	"github.com/concourse/concourse/atc/worker/transport"
15	"github.com/concourse/retryhttp"
16	"github.com/cppforlife/go-semi-semantic/version"
17)
18
19type dbWorkerProvider struct {
20	lockFactory                       lock.LockFactory
21	retryBackOffFactory               retryhttp.BackOffFactory
22	resourceFetcher                   Fetcher
23	imageFactory                      ImageFactory
24	dbResourceCacheFactory            db.ResourceCacheFactory
25	dbResourceConfigFactory           db.ResourceConfigFactory
26	dbWorkerBaseResourceTypeFactory   db.WorkerBaseResourceTypeFactory
27	dbTaskCacheFactory                db.TaskCacheFactory
28	dbWorkerTaskCacheFactory          db.WorkerTaskCacheFactory
29	dbVolumeRepository                db.VolumeRepository
30	dbTeamFactory                     db.TeamFactory
31	dbWorkerFactory                   db.WorkerFactory
32	workerVersion                     version.Version
33	baggageclaimResponseHeaderTimeout time.Duration
34	gardenRequestTimeout              time.Duration
35	policyChecker *policy.Checker
36}
37
38func NewDBWorkerProvider(
39	lockFactory lock.LockFactory,
40	retryBackOffFactory retryhttp.BackOffFactory,
41	fetcher Fetcher,
42	imageFactory ImageFactory,
43	dbResourceCacheFactory db.ResourceCacheFactory,
44	dbResourceConfigFactory db.ResourceConfigFactory,
45	dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory,
46	dbTaskCacheFactory db.TaskCacheFactory,
47	dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory,
48	dbVolumeRepository db.VolumeRepository,
49	dbTeamFactory db.TeamFactory,
50	workerFactory db.WorkerFactory,
51	workerVersion version.Version,
52	baggageclaimResponseHeaderTimeout, gardenRequestTimeout time.Duration,
53	policyChecker *policy.Checker,
54) WorkerProvider {
55	return &dbWorkerProvider{
56		lockFactory:                       lockFactory,
57		retryBackOffFactory:               retryBackOffFactory,
58		resourceFetcher:                   fetcher,
59		imageFactory:                      imageFactory,
60		dbResourceCacheFactory:            dbResourceCacheFactory,
61		dbResourceConfigFactory:           dbResourceConfigFactory,
62		dbWorkerBaseResourceTypeFactory:   dbWorkerBaseResourceTypeFactory,
63		dbTaskCacheFactory:                dbTaskCacheFactory,
64		dbWorkerTaskCacheFactory:          dbWorkerTaskCacheFactory,
65		dbVolumeRepository:                dbVolumeRepository,
66		dbTeamFactory:                     dbTeamFactory,
67		dbWorkerFactory:                   workerFactory,
68		workerVersion:                     workerVersion,
69		baggageclaimResponseHeaderTimeout: baggageclaimResponseHeaderTimeout,
70		gardenRequestTimeout:              gardenRequestTimeout,
71		policyChecker: policyChecker,
72	}
73}
74
75func (provider *dbWorkerProvider) RunningWorkers(logger lager.Logger) ([]Worker, error) {
76	savedWorkers, err := provider.dbWorkerFactory.Workers()
77	if err != nil {
78		return nil, err
79	}
80
81	buildContainersCountPerWorker, err := provider.dbWorkerFactory.BuildContainersCountPerWorker()
82	if err != nil {
83		return nil, err
84	}
85
86	workers := []Worker{}
87
88	for _, savedWorker := range savedWorkers {
89		if savedWorker.State() != db.WorkerStateRunning {
90			continue
91		}
92
93		workerLog := logger.Session("running-worker")
94		worker := provider.NewGardenWorker(
95			workerLog,
96			savedWorker,
97			buildContainersCountPerWorker[savedWorker.Name()],
98		)
99		if !worker.IsVersionCompatible(workerLog, provider.workerVersion) {
100			continue
101		}
102
103		workers = append(workers, worker)
104	}
105
106	return workers, nil
107}
108
109func (provider *dbWorkerProvider) FindWorkersForContainerByOwner(
110	logger lager.Logger,
111	owner db.ContainerOwner,
112) ([]Worker, error) {
113	logger = logger.Session("worker-for-container")
114	dbWorkers, err := provider.dbWorkerFactory.FindWorkersForContainerByOwner(owner)
115	if err != nil {
116		return nil, err
117	}
118
119	var workers []Worker
120	for _, w := range dbWorkers {
121		worker := provider.NewGardenWorker(logger, w, 0)
122		if worker.IsVersionCompatible(logger, provider.workerVersion) {
123			workers = append(workers, worker)
124		}
125	}
126
127	return workers, nil
128}
129
130func (provider *dbWorkerProvider) FindWorkerForContainer(
131	logger lager.Logger,
132	teamID int,
133	handle string,
134) (Worker, bool, error) {
135	logger = logger.Session("worker-for-container")
136	team := provider.dbTeamFactory.GetByID(teamID)
137
138	dbWorker, found, err := team.FindWorkerForContainer(handle)
139	if err != nil {
140		return nil, false, err
141	}
142
143	if !found {
144		return nil, false, nil
145	}
146
147	worker := provider.NewGardenWorker(logger, dbWorker, 0)
148	if !worker.IsVersionCompatible(logger, provider.workerVersion) {
149		return nil, false, nil
150	}
151	return worker, true, err
152}
153
154func (provider *dbWorkerProvider) FindWorkerForVolume(
155	logger lager.Logger,
156	teamID int,
157	handle string,
158) (Worker, bool, error) {
159	logger = logger.Session("worker-for-volume")
160	team := provider.dbTeamFactory.GetByID(teamID)
161
162	dbWorker, found, err := team.FindWorkerForVolume(handle)
163	if err != nil {
164		return nil, false, err
165	}
166
167	if !found {
168		return nil, false, nil
169	}
170
171	worker := provider.NewGardenWorker(logger, dbWorker, 0)
172	if !worker.IsVersionCompatible(logger, provider.workerVersion) {
173		return nil, false, nil
174	}
175	return worker, true, err
176}
177
178func (provider *dbWorkerProvider) NewGardenWorker(logger lager.Logger, savedWorker db.Worker, buildContainersCount int) Worker {
179	gcf := gclient.NewGardenClientFactory(
180		provider.dbWorkerFactory,
181		logger.Session("garden-connection"),
182		savedWorker.Name(),
183		savedWorker.GardenAddr(),
184		provider.retryBackOffFactory,
185		provider.gardenRequestTimeout,
186	)
187
188	gClient := gcf.NewClient()
189
190	bClient := bclient.New("", transport.NewBaggageclaimRoundTripper(
191		savedWorker.Name(),
192		savedWorker.BaggageclaimURL(),
193		provider.dbWorkerFactory,
194		&http.Transport{
195			DisableKeepAlives:     true,
196			ResponseHeaderTimeout: provider.baggageclaimResponseHeaderTimeout,
197		},
198	))
199
200	volumeClient := NewVolumeClient(
201		bClient,
202		savedWorker,
203		clock.NewClock(),
204		provider.lockFactory,
205		provider.dbVolumeRepository,
206		provider.dbWorkerBaseResourceTypeFactory,
207		provider.dbTaskCacheFactory,
208		provider.dbWorkerTaskCacheFactory,
209	)
210
211	return NewGardenWorker(
212		gClient,
213		provider.dbVolumeRepository,
214		volumeClient,
215		provider.imageFactory,
216		provider.resourceFetcher,
217		provider.dbTeamFactory,
218		savedWorker,
219		provider.dbResourceCacheFactory,
220		buildContainersCount,
221		provider.policyChecker,
222	)
223}
224