1package worker
2
3import (
4	"errors"
5	"fmt"
6	"time"
7
8	"code.cloudfoundry.org/clock"
9	"code.cloudfoundry.org/lager"
10	"github.com/concourse/baggageclaim"
11	"github.com/concourse/concourse/atc/db"
12	"github.com/concourse/concourse/atc/db/lock"
13	"github.com/concourse/concourse/atc/metric"
14)
15
16const creatingVolumeRetryDelay = 1 * time.Second
17
18//go:generate counterfeiter . VolumeClient
19
20type VolumeClient interface {
21	FindOrCreateVolumeForContainer(
22		lager.Logger,
23		VolumeSpec,
24		db.CreatingContainer,
25		int,
26		string,
27	) (Volume, error)
28	FindOrCreateCOWVolumeForContainer(
29		lager.Logger,
30		VolumeSpec,
31		db.CreatingContainer,
32		Volume,
33		int,
34		string,
35	) (Volume, error)
36	FindOrCreateVolumeForBaseResourceType(
37		lager.Logger,
38		VolumeSpec,
39		int,
40		string,
41	) (Volume, error)
42	CreateVolume(
43		lager.Logger,
44		VolumeSpec,
45		int,
46		string,
47		db.VolumeType,
48	) (Volume, error)
49	FindVolumeForResourceCache(
50		lager.Logger,
51		db.UsedResourceCache,
52	) (Volume, bool, error)
53	FindVolumeForTaskCache(
54		logger lager.Logger,
55		teamID int,
56		jobID int,
57		stepName string,
58		path string,
59	) (Volume, bool, error)
60	CreateVolumeForTaskCache(
61		logger lager.Logger,
62		volumeSpec VolumeSpec,
63		teamID int,
64		jobID int,
65		stepName string,
66		path string,
67	) (Volume, error)
68	FindOrCreateVolumeForResourceCerts(
69		logger lager.Logger,
70	) (volume Volume, found bool, err error)
71
72	LookupVolume(lager.Logger, string) (Volume, bool, error)
73}
74
75type VolumeSpec struct {
76	Strategy   baggageclaim.Strategy
77	Properties VolumeProperties
78	Privileged bool
79	TTL        time.Duration
80}
81
82func (spec VolumeSpec) baggageclaimVolumeSpec() baggageclaim.VolumeSpec {
83	return baggageclaim.VolumeSpec{
84		Strategy:   spec.Strategy,
85		Privileged: spec.Privileged,
86		Properties: baggageclaim.VolumeProperties(spec.Properties),
87	}
88}
89
90type VolumeProperties map[string]string
91
92type ErrCreatedVolumeNotFound struct {
93	Handle     string
94	WorkerName string
95}
96
97func (e ErrCreatedVolumeNotFound) Error() string {
98	return fmt.Sprintf("volume '%s' disappeared from worker '%s'", e.Handle, e.WorkerName)
99}
100
101var ErrBaseResourceTypeNotFound = errors.New("base resource type not found")
102
103type volumeClient struct {
104	baggageclaimClient              baggageclaim.Client
105	lockFactory                     lock.LockFactory
106	dbVolumeRepository              db.VolumeRepository
107	dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory
108	dbTaskCacheFactory              db.TaskCacheFactory
109	dbWorkerTaskCacheFactory        db.WorkerTaskCacheFactory
110	clock                           clock.Clock
111	dbWorker                        db.Worker
112}
113
114func NewVolumeClient(
115	baggageclaimClient baggageclaim.Client,
116	dbWorker db.Worker,
117	clock clock.Clock,
118
119	lockFactory lock.LockFactory,
120	dbVolumeRepository db.VolumeRepository,
121	dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory,
122	dbTaskCacheFactory db.TaskCacheFactory,
123	dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory,
124) VolumeClient {
125	return &volumeClient{
126		baggageclaimClient:              baggageclaimClient,
127		lockFactory:                     lockFactory,
128		dbVolumeRepository:              dbVolumeRepository,
129		dbWorkerBaseResourceTypeFactory: dbWorkerBaseResourceTypeFactory,
130		dbTaskCacheFactory:              dbTaskCacheFactory,
131		dbWorkerTaskCacheFactory:        dbWorkerTaskCacheFactory,
132		clock:                           clock,
133		dbWorker:                        dbWorker,
134	}
135}
136
137func (c *volumeClient) FindOrCreateVolumeForContainer(
138	logger lager.Logger,
139	volumeSpec VolumeSpec,
140	container db.CreatingContainer,
141	teamID int,
142	mountPath string,
143) (Volume, error) {
144	return c.findOrCreateVolume(
145		logger.Session("find-or-create-volume-for-container"),
146		volumeSpec,
147		func() (db.CreatingVolume, db.CreatedVolume, error) {
148			return c.dbVolumeRepository.FindContainerVolume(teamID, c.dbWorker.Name(), container, mountPath)
149		},
150		func() (db.CreatingVolume, error) {
151			return c.dbVolumeRepository.CreateContainerVolume(teamID, c.dbWorker.Name(), container, mountPath)
152		},
153	)
154}
155
156func (c *volumeClient) FindOrCreateCOWVolumeForContainer(
157	logger lager.Logger,
158	volumeSpec VolumeSpec,
159	container db.CreatingContainer,
160	parent Volume,
161	teamID int,
162	mountPath string,
163) (Volume, error) {
164	return c.findOrCreateVolume(
165		logger.Session("find-or-create-cow-volume-for-container"),
166		volumeSpec,
167		func() (db.CreatingVolume, db.CreatedVolume, error) {
168			return c.dbVolumeRepository.FindContainerVolume(teamID, c.dbWorker.Name(), container, mountPath)
169		},
170		func() (db.CreatingVolume, error) {
171			return parent.CreateChildForContainer(container, mountPath)
172		},
173	)
174}
175
176func (c *volumeClient) CreateVolume(
177	logger lager.Logger,
178	volumeSpec VolumeSpec,
179	teamID int,
180	workerName string,
181	volumeType db.VolumeType,
182) (Volume, error) {
183	return c.findOrCreateVolume(
184		logger.Session("find-or-create-volume-for-artifact"),
185		volumeSpec,
186		func() (db.CreatingVolume, db.CreatedVolume, error) {
187			return nil, nil, nil
188		},
189		func() (db.CreatingVolume, error) {
190			return c.dbVolumeRepository.CreateVolume(teamID, workerName, volumeType)
191		},
192	)
193}
194
195func (c *volumeClient) FindOrCreateVolumeForBaseResourceType(
196	logger lager.Logger,
197	volumeSpec VolumeSpec,
198	teamID int,
199	resourceTypeName string,
200) (Volume, error) {
201	workerBaseResourceType, found, err := c.dbWorkerBaseResourceTypeFactory.Find(resourceTypeName, c.dbWorker)
202	if err != nil {
203		return nil, err
204	}
205
206	if !found {
207		logger.Error("base-resource-type-not-found", ErrBaseResourceTypeNotFound, lager.Data{"resource-type-name": resourceTypeName})
208		return nil, ErrBaseResourceTypeNotFound
209	}
210
211	return c.findOrCreateVolume(
212		logger.Session("find-or-create-volume-for-base-resource-type"),
213		volumeSpec,
214		func() (db.CreatingVolume, db.CreatedVolume, error) {
215			return c.dbVolumeRepository.FindBaseResourceTypeVolume(workerBaseResourceType)
216		},
217		func() (db.CreatingVolume, error) {
218			return c.dbVolumeRepository.CreateBaseResourceTypeVolume(workerBaseResourceType)
219		},
220	)
221}
222
223func (c *volumeClient) FindVolumeForResourceCache(
224	logger lager.Logger,
225	usedResourceCache db.UsedResourceCache,
226) (Volume, bool, error) {
227	dbVolume, found, err := c.dbVolumeRepository.FindResourceCacheVolume(c.dbWorker.Name(), usedResourceCache)
228	if err != nil {
229		logger.Error("failed-to-lookup-resource-cache-volume-in-db", err)
230		return nil, false, err
231	}
232
233	if !found {
234		return nil, false, nil
235	}
236
237	bcVolume, found, err := c.baggageclaimClient.LookupVolume(logger, dbVolume.Handle())
238	if err != nil {
239		logger.Error("failed-to-lookup-volume-in-bc", err)
240		return nil, false, err
241	}
242
243	if !found {
244		return nil, false, nil
245	}
246
247	return NewVolume(bcVolume, dbVolume, c), true, nil
248}
249
250func (c *volumeClient) CreateVolumeForTaskCache(
251	logger lager.Logger,
252	volumeSpec VolumeSpec,
253	teamID int,
254	jobID int,
255	stepName string,
256	path string,
257) (Volume, error) {
258	usedTaskCache, err := c.dbTaskCacheFactory.FindOrCreate(jobID, stepName, path)
259	if err != nil {
260		logger.Error("failed-to-find-or-create-task-cache-in-db", err)
261		return nil, err
262	}
263
264	workerTaskCache := db.WorkerTaskCache{
265		WorkerName: c.dbWorker.Name(),
266		TaskCache:  usedTaskCache,
267	}
268
269	usedWorkerTaskCache, err := c.dbWorkerTaskCacheFactory.FindOrCreate(workerTaskCache)
270
271	return c.findOrCreateVolume(
272		logger.Session("find-or-create-volume-for-container"),
273		volumeSpec,
274		func() (db.CreatingVolume, db.CreatedVolume, error) {
275			return nil, nil, nil
276		},
277		func() (db.CreatingVolume, error) {
278			return c.dbVolumeRepository.CreateTaskCacheVolume(teamID, usedWorkerTaskCache)
279		},
280	)
281}
282
283func (c *volumeClient) FindOrCreateVolumeForResourceCerts(logger lager.Logger) (Volume, bool, error) {
284
285	logger.Debug("finding-worker-resource-certs")
286	usedResourceCerts, found, err := c.dbWorker.ResourceCerts()
287	if err != nil {
288		logger.Error("failed-to-find-worker-resource-certs", err)
289		return nil, false, err
290	}
291
292	if !found {
293		logger.Debug("worker-resource-certs-not-found")
294		return nil, false, nil
295	}
296
297	certsPath := c.dbWorker.CertsPath()
298	if certsPath == nil {
299		logger.Debug("worker-certs-path-is-empty")
300		return nil, false, nil
301	}
302
303	volume, err := c.findOrCreateVolume(
304		logger.Session("find-or-create-volume-for-resource-certs"),
305		VolumeSpec{
306			Strategy: baggageclaim.ImportStrategy{
307				Path:           *certsPath,
308				FollowSymlinks: true,
309			},
310		},
311		func() (db.CreatingVolume, db.CreatedVolume, error) {
312			return c.dbVolumeRepository.FindResourceCertsVolume(c.dbWorker.Name(), usedResourceCerts)
313		},
314		func() (db.CreatingVolume, error) {
315			return c.dbVolumeRepository.CreateResourceCertsVolume(c.dbWorker.Name(), usedResourceCerts)
316		},
317	)
318
319	return volume, true, err
320}
321
322func (c *volumeClient) FindVolumeForTaskCache(
323	logger lager.Logger,
324	teamID int,
325	jobID int,
326	stepName string,
327	path string,
328) (Volume, bool, error) {
329	usedTaskCache, found, err := c.dbTaskCacheFactory.Find(jobID, stepName, path)
330	if err != nil {
331		logger.Error("failed-to-lookup-task-cache-in-db", err)
332		return nil, false, err
333	}
334
335	if !found {
336		return nil, false, nil
337	}
338
339	dbVolume, found, err := c.dbVolumeRepository.FindTaskCacheVolume(teamID, c.dbWorker.Name(), usedTaskCache)
340	if err != nil {
341		logger.Error("failed-to-lookup-task-cache-volume-in-db", err)
342		return nil, false, err
343	}
344
345	if !found {
346		return nil, false, nil
347	}
348
349	bcVolume, found, err := c.baggageclaimClient.LookupVolume(logger, dbVolume.Handle())
350	if err != nil {
351		logger.Error("failed-to-lookup-volume-in-bc", err)
352		return nil, false, err
353	}
354
355	if !found {
356		return nil, false, nil
357	}
358
359	return NewVolume(bcVolume, dbVolume, c), true, nil
360}
361
362func (c *volumeClient) LookupVolume(logger lager.Logger, handle string) (Volume, bool, error) {
363	dbVolume, found, err := c.dbVolumeRepository.FindCreatedVolume(handle)
364	if err != nil {
365		logger.Error("failed-to-lookup-volume-in-db", err)
366		return nil, false, err
367	}
368
369	if !found {
370		return nil, false, nil
371	}
372
373	bcVolume, found, err := c.baggageclaimClient.LookupVolume(logger, handle)
374	if err != nil {
375		logger.Error("failed-to-lookup-volume-in-bc", err)
376		return nil, false, err
377	}
378
379	if !found {
380		return nil, false, nil
381	}
382
383	return NewVolume(bcVolume, dbVolume, c), true, nil
384}
385
386func (c *volumeClient) findOrCreateVolume(
387	logger lager.Logger,
388	volumeSpec VolumeSpec,
389	findVolumeFunc func() (db.CreatingVolume, db.CreatedVolume, error),
390	createVolumeFunc func() (db.CreatingVolume, error),
391) (Volume, error) {
392	creatingVolume, createdVolume, err := findVolumeFunc()
393	if err != nil {
394		logger.Error("failed-to-find-volume-in-db", err)
395		return nil, err
396	}
397
398	if createdVolume != nil {
399		logger = logger.WithData(lager.Data{"volume": createdVolume.Handle()})
400
401		bcVolume, bcVolumeFound, err := c.baggageclaimClient.LookupVolume(
402			logger.Session("lookup-volume"),
403			createdVolume.Handle(),
404		)
405		if err != nil {
406			logger.Error("failed-to-lookup-volume-in-baggageclaim", err)
407			return nil, err
408		}
409
410		if !bcVolumeFound {
411			logger.Info("created-volume-not-found")
412			return nil, ErrCreatedVolumeNotFound{Handle: createdVolume.Handle(), WorkerName: createdVolume.WorkerName()}
413		}
414
415		logger.Debug("found-created-volume")
416
417		return NewVolume(bcVolume, createdVolume, c), nil
418	}
419
420	if creatingVolume != nil {
421		logger = logger.WithData(lager.Data{"volume": creatingVolume.Handle()})
422		logger.Debug("found-creating-volume")
423	} else {
424		creatingVolume, err = createVolumeFunc()
425		if err != nil {
426			logger.Error("failed-to-create-volume-in-db", err)
427			return nil, err
428		}
429
430		logger = logger.WithData(lager.Data{"volume": creatingVolume.Handle()})
431
432		logger.Debug("created-creating-volume")
433	}
434
435	lock, acquired, err := c.lockFactory.Acquire(logger, lock.NewVolumeCreatingLockID(creatingVolume.ID()))
436	if err != nil {
437		logger.Error("failed-to-acquire-volume-creating-lock", err)
438		return nil, err
439	}
440
441	if !acquired {
442		c.clock.Sleep(creatingVolumeRetryDelay)
443		return c.findOrCreateVolume(logger, volumeSpec, findVolumeFunc, createVolumeFunc)
444	}
445
446	defer lock.Release()
447
448	bcVolume, bcVolumeFound, err := c.baggageclaimClient.LookupVolume(
449		logger.Session("create-volume"),
450		creatingVolume.Handle(),
451	)
452	if err != nil {
453		logger.Error("failed-to-lookup-volume-in-baggageclaim", err)
454		return nil, err
455	}
456
457	if bcVolumeFound {
458		logger.Debug("real-volume-exists")
459	} else {
460		logger.Debug("creating-real-volume")
461
462		bcVolume, err = c.baggageclaimClient.CreateVolume(
463			logger.Session("create-volume"),
464			creatingVolume.Handle(),
465			volumeSpec.baggageclaimVolumeSpec(),
466		)
467		if err != nil {
468			logger.Error("failed-to-create-volume-in-baggageclaim", err)
469
470			_, failedErr := creatingVolume.Failed()
471			if failedErr != nil {
472				logger.Error("failed-to-mark-volume-as-failed", failedErr)
473			}
474
475			metric.Metrics.FailedVolumes.Inc()
476
477			return nil, err
478		}
479
480		metric.Metrics.VolumesCreated.Inc()
481	}
482
483	createdVolume, err = creatingVolume.Created()
484	if err != nil {
485		logger.Error("failed-to-initialize-volume", err)
486		return nil, err
487	}
488
489	logger.Debug("created")
490
491	return NewVolume(bcVolume, createdVolume, c), nil
492}
493