1package db
2
3import (
4	"database/sql"
5	"errors"
6	"fmt"
7	"strings"
8	"time"
9
10	sq "github.com/Masterminds/squirrel"
11	"github.com/concourse/concourse/atc"
12	"github.com/lib/pq"
13	uuid "github.com/nu7hatch/gouuid"
14)
15
16var (
17	ErrWorkerNotPresent         = errors.New("worker not present in db")
18	ErrCannotPruneRunningWorker = errors.New("worker not stalled for pruning")
19)
20
21type ContainerOwnerDisappearedError struct {
22	owner ContainerOwner
23}
24
25func (e ContainerOwnerDisappearedError) Error() string {
26	return fmt.Sprintf("container owner %T disappeared", e.owner)
27}
28
29type WorkerState string
30
31const (
32	WorkerStateRunning  = WorkerState("running")
33	WorkerStateStalled  = WorkerState("stalled")
34	WorkerStateLanding  = WorkerState("landing")
35	WorkerStateLanded   = WorkerState("landed")
36	WorkerStateRetiring = WorkerState("retiring")
37)
38
39func AllWorkerStates() []WorkerState {
40	return []WorkerState{
41		WorkerStateRunning,
42		WorkerStateStalled,
43		WorkerStateLanding,
44		WorkerStateLanded,
45		WorkerStateRetiring,
46	}
47}
48
49//go:generate counterfeiter . Worker
50
51type Worker interface {
52	Name() string
53	Version() *string
54	State() WorkerState
55	GardenAddr() *string
56	BaggageclaimURL() *string
57	CertsPath() *string
58	ResourceCerts() (*UsedWorkerResourceCerts, bool, error)
59	HTTPProxyURL() string
60	HTTPSProxyURL() string
61	NoProxy() string
62	ActiveContainers() int
63	ActiveVolumes() int
64	ResourceTypes() []atc.WorkerResourceType
65	Platform() string
66	Tags() []string
67	TeamID() int
68	TeamName() string
69	StartTime() time.Time
70	ExpiresAt() time.Time
71	Ephemeral() bool
72
73	Reload() (bool, error)
74
75	Land() error
76	Retire() error
77	Prune() error
78	Delete() error
79
80	ActiveTasks() (int, error)
81	IncreaseActiveTasks() error
82	DecreaseActiveTasks() error
83
84	FindContainer(owner ContainerOwner) (CreatingContainer, CreatedContainer, error)
85	CreateContainer(owner ContainerOwner, meta ContainerMetadata) (CreatingContainer, error)
86}
87
88type worker struct {
89	conn Conn
90
91	name             string
92	version          *string
93	state            WorkerState
94	gardenAddr       *string
95	baggageclaimURL  *string
96	httpProxyURL     string
97	httpsProxyURL    string
98	noProxy          string
99	activeContainers int
100	activeVolumes    int
101	activeTasks      int
102	resourceTypes    []atc.WorkerResourceType
103	platform         string
104	tags             []string
105	teamID           int
106	teamName         string
107	startTime        time.Time
108	expiresAt        time.Time
109	certsPath        *string
110	ephemeral        bool
111}
112
113func (worker *worker) Name() string             { return worker.name }
114func (worker *worker) Version() *string         { return worker.version }
115func (worker *worker) State() WorkerState       { return worker.state }
116func (worker *worker) GardenAddr() *string      { return worker.gardenAddr }
117func (worker *worker) CertsPath() *string       { return worker.certsPath }
118func (worker *worker) BaggageclaimURL() *string { return worker.baggageclaimURL }
119
120func (worker *worker) HTTPProxyURL() string                    { return worker.httpProxyURL }
121func (worker *worker) HTTPSProxyURL() string                   { return worker.httpsProxyURL }
122func (worker *worker) NoProxy() string                         { return worker.noProxy }
123func (worker *worker) ActiveContainers() int                   { return worker.activeContainers }
124func (worker *worker) ActiveVolumes() int                      { return worker.activeVolumes }
125func (worker *worker) ResourceTypes() []atc.WorkerResourceType { return worker.resourceTypes }
126func (worker *worker) Platform() string                        { return worker.platform }
127func (worker *worker) Tags() []string                          { return worker.tags }
128func (worker *worker) TeamID() int                             { return worker.teamID }
129func (worker *worker) TeamName() string                        { return worker.teamName }
130func (worker *worker) Ephemeral() bool                         { return worker.ephemeral }
131
132func (worker *worker) StartTime() time.Time { return worker.startTime }
133func (worker *worker) ExpiresAt() time.Time { return worker.expiresAt }
134
135func (worker *worker) Reload() (bool, error) {
136	row := workersQuery.Where(sq.Eq{"w.name": worker.name}).
137		RunWith(worker.conn).
138		QueryRow()
139
140	err := scanWorker(worker, row)
141	if err != nil {
142		if err == sql.ErrNoRows {
143			return false, nil
144		}
145		return false, err
146	}
147
148	return true, nil
149}
150
151func (worker *worker) Land() error {
152	cSQL, _, err := sq.Case("state").
153		When("'landed'::worker_state", "'landed'::worker_state").
154		Else("'landing'::worker_state").
155		ToSql()
156	if err != nil {
157		return err
158	}
159
160	result, err := psql.Update("workers").
161		Set("state", sq.Expr("("+cSQL+")")).
162		Where(sq.Eq{"name": worker.name}).
163		RunWith(worker.conn).
164		Exec()
165
166	if err != nil {
167		return err
168	}
169
170	count, err := result.RowsAffected()
171	if err != nil {
172		return err
173	}
174
175	if count == 0 {
176		return ErrWorkerNotPresent
177	}
178
179	return nil
180}
181
182func (worker *worker) Retire() error {
183	result, err := psql.Update("workers").
184		SetMap(map[string]interface{}{
185			"state": string(WorkerStateRetiring),
186		}).
187		Where(sq.Eq{"name": worker.name}).
188		RunWith(worker.conn).
189		Exec()
190	if err != nil {
191		return err
192	}
193
194	count, err := result.RowsAffected()
195	if err != nil {
196		return err
197	}
198
199	if count == 0 {
200		return ErrWorkerNotPresent
201	}
202
203	return nil
204}
205
206func (worker *worker) Prune() error {
207	tx, err := worker.conn.Begin()
208	if err != nil {
209		return err
210	}
211
212	defer Rollback(tx)
213
214	rows, err := sq.Delete("workers").
215		Where(sq.Eq{
216			"name": worker.name,
217		}).
218		Where(sq.NotEq{
219			"state": string(WorkerStateRunning),
220		}).
221		PlaceholderFormat(sq.Dollar).
222		RunWith(tx).
223		Exec()
224
225	if err != nil {
226		return err
227	}
228
229	affected, err := rows.RowsAffected()
230	if err != nil {
231		return err
232	}
233
234	if affected == 0 {
235		//check whether the worker exists in the database at all
236		var one int
237		err := psql.Select("1").From("workers").Where(sq.Eq{"name": worker.name}).
238			RunWith(tx).
239			QueryRow().
240			Scan(&one)
241		if err != nil {
242			if err == sql.ErrNoRows {
243				return ErrWorkerNotPresent
244			}
245			return err
246		}
247
248		return ErrCannotPruneRunningWorker
249	}
250
251	return tx.Commit()
252}
253
254func (worker *worker) Delete() error {
255	_, err := sq.Delete("workers").
256		Where(sq.Eq{
257			"name": worker.name,
258		}).
259		PlaceholderFormat(sq.Dollar).
260		RunWith(worker.conn).
261		Exec()
262
263	return err
264}
265
266func (worker *worker) ResourceCerts() (*UsedWorkerResourceCerts, bool, error) {
267	if worker.certsPath != nil {
268		wrc := &WorkerResourceCerts{
269			WorkerName: worker.name,
270			CertsPath:  *worker.certsPath,
271		}
272
273		return wrc.Find(worker.conn)
274	}
275
276	return nil, false, nil
277}
278
279func (worker *worker) FindContainer(owner ContainerOwner) (CreatingContainer, CreatedContainer, error) {
280	ownerQuery, found, err := owner.Find(worker.conn)
281	if err != nil {
282		return nil, nil, err
283	}
284
285	if !found {
286		return nil, nil, nil
287	}
288
289	return worker.findContainer(sq.And{
290		sq.Eq{"worker_name": worker.name},
291		ownerQuery,
292	})
293}
294
295func (worker *worker) CreateContainer(owner ContainerOwner, meta ContainerMetadata) (CreatingContainer, error) {
296	handle, err := uuid.NewV4()
297	if err != nil {
298		return nil, err
299	}
300
301	var containerID int
302	cols := []interface{}{&containerID}
303
304	metadata := &ContainerMetadata{}
305	cols = append(cols, metadata.ScanTargets()...)
306
307	tx, err := worker.conn.Begin()
308	if err != nil {
309		return nil, err
310	}
311
312	defer Rollback(tx)
313
314	insMap := meta.SQLMap()
315	insMap["worker_name"] = worker.name
316	insMap["handle"] = handle.String()
317
318	ownerCols, err := owner.Create(tx, worker.name)
319	if err != nil {
320		return nil, err
321	}
322
323	for k, v := range ownerCols {
324		insMap[k] = v
325	}
326
327	err = psql.Insert("containers").
328		SetMap(insMap).
329		Suffix("RETURNING id, " + strings.Join(containerMetadataColumns, ", ")).
330		RunWith(tx).
331		QueryRow().
332		Scan(cols...)
333	if err != nil {
334		if pqErr, ok := err.(*pq.Error); ok && pqErr.Code.Name() == pqFKeyViolationErrCode {
335			return nil, ContainerOwnerDisappearedError{owner}
336		}
337
338		return nil, err
339	}
340
341	err = tx.Commit()
342	if err != nil {
343		return nil, err
344	}
345
346	return newCreatingContainer(
347		containerID,
348		handle.String(),
349		worker.name,
350		*metadata,
351		worker.conn,
352	), nil
353}
354
355func (worker *worker) findContainer(whereClause sq.Sqlizer) (CreatingContainer, CreatedContainer, error) {
356	creating, created, destroying, _, err := scanContainer(
357		selectContainers().
358			Where(whereClause).
359			RunWith(worker.conn).
360			QueryRow(),
361		worker.conn,
362	)
363	if err != nil {
364		if err == sql.ErrNoRows {
365			return nil, nil, nil
366		}
367		return nil, nil, err
368	}
369
370	if destroying != nil {
371		return nil, nil, nil
372	}
373
374	return creating, created, nil
375}
376
377func (worker *worker) ActiveTasks() (int, error) {
378	err := psql.Select("active_tasks").From("workers").Where(sq.Eq{"name": worker.name}).
379		RunWith(worker.conn).
380		QueryRow().
381		Scan(&worker.activeTasks)
382	if err != nil {
383		return 0, err
384	}
385	return worker.activeTasks, nil
386}
387
388func (worker *worker) IncreaseActiveTasks() error {
389	result, err := psql.Update("workers").
390		Set("active_tasks", sq.Expr("active_tasks+1")).
391		Where(sq.Eq{"name": worker.name}).
392		RunWith(worker.conn).
393		Exec()
394	if err != nil {
395		return err
396	}
397
398	count, err := result.RowsAffected()
399	if err != nil {
400		return err
401	}
402
403	if count == 0 {
404		return ErrWorkerNotPresent
405	}
406
407	return nil
408}
409
410func (worker *worker) DecreaseActiveTasks() error {
411	result, err := psql.Update("workers").
412		Set("active_tasks", sq.Expr("active_tasks-1")).
413		Where(sq.Eq{"name": worker.name}).
414		RunWith(worker.conn).
415		Exec()
416	if err != nil {
417		return err
418	}
419
420	count, err := result.RowsAffected()
421	if err != nil {
422		return err
423	}
424
425	if count == 0 {
426		return ErrWorkerNotPresent
427	}
428
429	return nil
430}
431