1package db
2
3import (
4	"database/sql"
5
6	sq "github.com/Masterminds/squirrel"
7)
8
9//go:generate counterfeiter . WorkerLifecycle
10
11type WorkerLifecycle interface {
12	DeleteUnresponsiveEphemeralWorkers() ([]string, error)
13	StallUnresponsiveWorkers() ([]string, error)
14	LandFinishedLandingWorkers() ([]string, error)
15	DeleteFinishedRetiringWorkers() ([]string, error)
16	GetWorkerStateByName() (map[string]WorkerState, error)
17}
18
19type workerLifecycle struct {
20	conn Conn
21}
22
23func NewWorkerLifecycle(conn Conn) WorkerLifecycle {
24	return &workerLifecycle{
25		conn: conn,
26	}
27}
28
29func (lifecycle *workerLifecycle) DeleteUnresponsiveEphemeralWorkers() ([]string, error) {
30	query, args, err := psql.Delete("workers").
31		Where(sq.Eq{"ephemeral": true}).
32		Where(sq.Expr("expires < NOW()")).
33		Suffix("RETURNING name").
34		ToSql()
35
36	if err != nil {
37		return []string{}, err
38	}
39
40	rows, err := lifecycle.conn.Query(query, args...)
41	if err != nil {
42		return nil, err
43	}
44
45	return workersAffected(rows)
46}
47
48func (lifecycle *workerLifecycle) StallUnresponsiveWorkers() ([]string, error) {
49	query, args, err := psql.Update("workers").
50		SetMap(map[string]interface{}{
51			"state":   string(WorkerStateStalled),
52			"expires": nil,
53		}).
54		Where(sq.Eq{"state": string(WorkerStateRunning)}).
55		Where(sq.Expr("expires < NOW()")).
56		Suffix("RETURNING name").
57		ToSql()
58	if err != nil {
59		return []string{}, err
60	}
61
62	rows, err := lifecycle.conn.Query(query, args...)
63	if err != nil {
64		return nil, err
65	}
66
67	return workersAffected(rows)
68}
69
70func (lifecycle *workerLifecycle) DeleteFinishedRetiringWorkers() ([]string, error) {
71	// Squirrel does not have default support for subqueries in where clauses.
72	// We hacked together a way to do it
73	//
74	// First we generate the subquery's SQL and args using
75	// sq.Select instead of psql.Select so that we get
76	// unordered placeholders instead of psql's ordered placeholders
77	subQ, subQArgs, err := sq.Select("w.name").
78		Distinct().
79		From("builds b").
80		Join("containers c ON b.id = c.build_id").
81		Join("workers w ON w.name = c.worker_name").
82		LeftJoin("jobs j ON j.id = b.job_id").
83		Where(sq.Eq{"b.completed": false}).
84		Where(sq.Or{
85			sq.Eq{
86				"j.interruptible": false,
87			},
88			sq.Eq{
89				"b.job_id": nil,
90			},
91		}).ToSql()
92
93	if err != nil {
94		return []string{}, err
95	}
96
97	// Then we inject the subquery sql directly into
98	// the where clause, and "add" the args from the
99	// first query to the second query's args
100	//
101	// We use sq.Delete instead of psql.Delete for the same reason
102	// but then change the placeholders using .PlaceholderFormat(sq.Dollar)
103	// to go back to postgres's format
104	query, args, err := sq.Delete("workers").
105		Where(sq.Eq{
106			"state": string(WorkerStateRetiring),
107		}).
108		Where("name NOT IN ("+subQ+")", subQArgs...).
109		PlaceholderFormat(sq.Dollar).
110		Suffix("RETURNING name").
111		ToSql()
112
113	if err != nil {
114		return []string{}, err
115	}
116
117	rows, err := lifecycle.conn.Query(query, args...)
118	if err != nil {
119		return nil, err
120	}
121
122	return workersAffected(rows)
123}
124
125func (lifecycle *workerLifecycle) LandFinishedLandingWorkers() ([]string, error) {
126	subQ, subQArgs, err := sq.Select("w.name").
127		Distinct().
128		From("builds b").
129		Join("containers c ON b.id = c.build_id").
130		Join("workers w ON w.name = c.worker_name").
131		LeftJoin("jobs j ON j.id = b.job_id").
132		Where(sq.Eq{"b.completed": false}).
133		Where(sq.Or{
134			sq.Eq{
135				"j.interruptible": false,
136			},
137			sq.Eq{
138				"b.job_id": nil,
139			},
140		}).ToSql()
141
142	if err != nil {
143		return nil, err
144	}
145
146	query, args, err := sq.Update("workers").
147		Set("state", string(WorkerStateLanded)).
148		Set("addr", nil).
149		Set("baggageclaim_url", nil).
150		Where(sq.Eq{
151			"state": string(WorkerStateLanding),
152		}).
153		Where("name NOT IN ("+subQ+")", subQArgs...).
154		PlaceholderFormat(sq.Dollar).
155		Suffix("RETURNING name").
156		ToSql()
157
158	if err != nil {
159		return []string{}, err
160	}
161
162	rows, err := lifecycle.conn.Query(query, args...)
163	if err != nil {
164		return nil, err
165	}
166
167	return workersAffected(rows)
168}
169
170func (lifecycle *workerLifecycle) GetWorkerStateByName() (map[string]WorkerState, error) {
171	rows, err := psql.Select(`
172		name,
173		state
174	`).
175		From("workers").
176		RunWith(lifecycle.conn).
177		Query()
178
179	if err != nil {
180		return nil, err
181	}
182
183	defer Close(rows)
184	var name string
185	var state WorkerState
186
187	workerStateByName := make(map[string]WorkerState)
188
189	for rows.Next() {
190		err := rows.Scan(
191			&name,
192			&state,
193		)
194		if err != nil {
195			return nil, err
196		}
197		workerStateByName[name] = state
198	}
199
200	return workerStateByName, nil
201
202}
203func workersAffected(rows *sql.Rows) ([]string, error) {
204	var (
205		err         error
206		workerNames []string
207	)
208
209	defer Close(rows)
210
211	for rows.Next() {
212		var name string
213
214		err = rows.Scan(&name)
215		if err != nil {
216			return nil, err
217		}
218
219		workerNames = append(workerNames, name)
220	}
221
222	return workerNames, err
223}
224