1package db 2 3import ( 4 "database/sql" 5 6 sq "github.com/Masterminds/squirrel" 7) 8 9//go:generate counterfeiter . WorkerLifecycle 10 11type WorkerLifecycle interface { 12 DeleteUnresponsiveEphemeralWorkers() ([]string, error) 13 StallUnresponsiveWorkers() ([]string, error) 14 LandFinishedLandingWorkers() ([]string, error) 15 DeleteFinishedRetiringWorkers() ([]string, error) 16 GetWorkerStateByName() (map[string]WorkerState, error) 17} 18 19type workerLifecycle struct { 20 conn Conn 21} 22 23func NewWorkerLifecycle(conn Conn) WorkerLifecycle { 24 return &workerLifecycle{ 25 conn: conn, 26 } 27} 28 29func (lifecycle *workerLifecycle) DeleteUnresponsiveEphemeralWorkers() ([]string, error) { 30 query, args, err := psql.Delete("workers"). 31 Where(sq.Eq{"ephemeral": true}). 32 Where(sq.Expr("expires < NOW()")). 33 Suffix("RETURNING name"). 34 ToSql() 35 36 if err != nil { 37 return []string{}, err 38 } 39 40 rows, err := lifecycle.conn.Query(query, args...) 41 if err != nil { 42 return nil, err 43 } 44 45 return workersAffected(rows) 46} 47 48func (lifecycle *workerLifecycle) StallUnresponsiveWorkers() ([]string, error) { 49 query, args, err := psql.Update("workers"). 50 SetMap(map[string]interface{}{ 51 "state": string(WorkerStateStalled), 52 "expires": nil, 53 }). 54 Where(sq.Eq{"state": string(WorkerStateRunning)}). 55 Where(sq.Expr("expires < NOW()")). 56 Suffix("RETURNING name"). 57 ToSql() 58 if err != nil { 59 return []string{}, err 60 } 61 62 rows, err := lifecycle.conn.Query(query, args...) 63 if err != nil { 64 return nil, err 65 } 66 67 return workersAffected(rows) 68} 69 70func (lifecycle *workerLifecycle) DeleteFinishedRetiringWorkers() ([]string, error) { 71 // Squirrel does not have default support for subqueries in where clauses. 72 // We hacked together a way to do it 73 // 74 // First we generate the subquery's SQL and args using 75 // sq.Select instead of psql.Select so that we get 76 // unordered placeholders instead of psql's ordered placeholders 77 subQ, subQArgs, err := sq.Select("w.name"). 78 Distinct(). 79 From("builds b"). 80 Join("containers c ON b.id = c.build_id"). 81 Join("workers w ON w.name = c.worker_name"). 82 LeftJoin("jobs j ON j.id = b.job_id"). 83 Where(sq.Eq{"b.completed": false}). 84 Where(sq.Or{ 85 sq.Eq{ 86 "j.interruptible": false, 87 }, 88 sq.Eq{ 89 "b.job_id": nil, 90 }, 91 }).ToSql() 92 93 if err != nil { 94 return []string{}, err 95 } 96 97 // Then we inject the subquery sql directly into 98 // the where clause, and "add" the args from the 99 // first query to the second query's args 100 // 101 // We use sq.Delete instead of psql.Delete for the same reason 102 // but then change the placeholders using .PlaceholderFormat(sq.Dollar) 103 // to go back to postgres's format 104 query, args, err := sq.Delete("workers"). 105 Where(sq.Eq{ 106 "state": string(WorkerStateRetiring), 107 }). 108 Where("name NOT IN ("+subQ+")", subQArgs...). 109 PlaceholderFormat(sq.Dollar). 110 Suffix("RETURNING name"). 111 ToSql() 112 113 if err != nil { 114 return []string{}, err 115 } 116 117 rows, err := lifecycle.conn.Query(query, args...) 118 if err != nil { 119 return nil, err 120 } 121 122 return workersAffected(rows) 123} 124 125func (lifecycle *workerLifecycle) LandFinishedLandingWorkers() ([]string, error) { 126 subQ, subQArgs, err := sq.Select("w.name"). 127 Distinct(). 128 From("builds b"). 129 Join("containers c ON b.id = c.build_id"). 130 Join("workers w ON w.name = c.worker_name"). 131 LeftJoin("jobs j ON j.id = b.job_id"). 132 Where(sq.Eq{"b.completed": false}). 133 Where(sq.Or{ 134 sq.Eq{ 135 "j.interruptible": false, 136 }, 137 sq.Eq{ 138 "b.job_id": nil, 139 }, 140 }).ToSql() 141 142 if err != nil { 143 return nil, err 144 } 145 146 query, args, err := sq.Update("workers"). 147 Set("state", string(WorkerStateLanded)). 148 Set("addr", nil). 149 Set("baggageclaim_url", nil). 150 Where(sq.Eq{ 151 "state": string(WorkerStateLanding), 152 }). 153 Where("name NOT IN ("+subQ+")", subQArgs...). 154 PlaceholderFormat(sq.Dollar). 155 Suffix("RETURNING name"). 156 ToSql() 157 158 if err != nil { 159 return []string{}, err 160 } 161 162 rows, err := lifecycle.conn.Query(query, args...) 163 if err != nil { 164 return nil, err 165 } 166 167 return workersAffected(rows) 168} 169 170func (lifecycle *workerLifecycle) GetWorkerStateByName() (map[string]WorkerState, error) { 171 rows, err := psql.Select(` 172 name, 173 state 174 `). 175 From("workers"). 176 RunWith(lifecycle.conn). 177 Query() 178 179 if err != nil { 180 return nil, err 181 } 182 183 defer Close(rows) 184 var name string 185 var state WorkerState 186 187 workerStateByName := make(map[string]WorkerState) 188 189 for rows.Next() { 190 err := rows.Scan( 191 &name, 192 &state, 193 ) 194 if err != nil { 195 return nil, err 196 } 197 workerStateByName[name] = state 198 } 199 200 return workerStateByName, nil 201 202} 203func workersAffected(rows *sql.Rows) ([]string, error) { 204 var ( 205 err error 206 workerNames []string 207 ) 208 209 defer Close(rows) 210 211 for rows.Next() { 212 var name string 213 214 err = rows.Scan(&name) 215 if err != nil { 216 return nil, err 217 } 218 219 workerNames = append(workerNames, name) 220 } 221 222 return workerNames, err 223} 224