1package migrations 2 3import ( 4 "database/sql" 5 "encoding/json" 6 "errors" 7 "fmt" 8) 9 10type V5JobConfigs []V5JobConfig 11 12type V5JobConfig struct { 13 Name string `json:"name"` 14 Public bool `json:"public,omitempty"` 15 DisableManualTrigger bool `json:"disable_manual_trigger,omitempty"` 16 Serial bool `json:"serial,omitempty"` 17 SerialGroups []string `json:"serial_groups,omitempty"` 18 RawMaxInFlight int `json:"max_in_flight,omitempty"` 19 20 Abort *V5PlanConfig `json:"on_abort,omitempty"` 21 Error *V5PlanConfig `json:"on_error,omitempty"` 22 Failure *V5PlanConfig `json:"on_failure,omitempty"` 23 Ensure *V5PlanConfig `json:"ensure,omitempty"` 24 Success *V5PlanConfig `json:"on_success,omitempty"` 25 26 Plan V5PlanSequence `json:"plan"` 27} 28 29type V5PlanSequence []V5PlanConfig 30 31func (config V5JobConfig) Plans() []V5PlanConfig { 32 plan := v5collectPlans(V5PlanConfig{ 33 Do: &config.Plan, 34 Abort: config.Abort, 35 Error: config.Error, 36 Ensure: config.Ensure, 37 Failure: config.Failure, 38 Success: config.Success, 39 }) 40 41 return plan 42} 43 44func (config V5JobConfig) MaxInFlight() int { 45 if config.Serial || len(config.SerialGroups) > 0 { 46 return 1 47 } 48 49 if config.RawMaxInFlight != 0 { 50 return config.RawMaxInFlight 51 } 52 53 return 0 54} 55 56func v5collectPlans(plan V5PlanConfig) []V5PlanConfig { 57 var plans []V5PlanConfig 58 59 if plan.Abort != nil { 60 plans = append(plans, v5collectPlans(*plan.Abort)...) 61 } 62 63 if plan.Error != nil { 64 plans = append(plans, v5collectPlans(*plan.Error)...) 65 } 66 67 if plan.Success != nil { 68 plans = append(plans, v5collectPlans(*plan.Success)...) 69 } 70 71 if plan.Failure != nil { 72 plans = append(plans, v5collectPlans(*plan.Failure)...) 73 } 74 75 if plan.Ensure != nil { 76 plans = append(plans, v5collectPlans(*plan.Ensure)...) 77 } 78 79 if plan.Try != nil { 80 plans = append(plans, v5collectPlans(*plan.Try)...) 81 } 82 83 if plan.Do != nil { 84 for _, p := range *plan.Do { 85 plans = append(plans, v5collectPlans(p)...) 86 } 87 } 88 89 if plan.Aggregate != nil { 90 for _, p := range *plan.Aggregate { 91 plans = append(plans, v5collectPlans(p)...) 92 } 93 } 94 95 if plan.InParallel != nil { 96 for _, p := range plan.InParallel.Steps { 97 plans = append(plans, v5collectPlans(p)...) 98 } 99 } 100 101 return append(plans, plan) 102} 103 104type V5InParallelConfig struct { 105 Steps V5PlanSequence `json:"steps,omitempty"` 106 Limit int `json:"limit,omitempty"` 107 FailFast bool `json:"fail_fast,omitempty"` 108} 109 110type V5VersionConfig struct { 111 Every bool 112 Latest bool 113 Pinned Version 114} 115 116const V5VersionLatest = "latest" 117const V5VersionEvery = "every" 118 119func (c *V5VersionConfig) UnmarshalJSON(version []byte) error { 120 var data interface{} 121 122 err := json.Unmarshal(version, &data) 123 if err != nil { 124 return err 125 } 126 127 switch actual := data.(type) { 128 case string: 129 c.Every = actual == "every" 130 c.Latest = actual == "latest" 131 case map[string]interface{}: 132 version := Version{} 133 134 for k, v := range actual { 135 if s, ok := v.(string); ok { 136 version[k] = s 137 continue 138 } 139 140 return fmt.Errorf("the value %v of %s is not a string", v, k) 141 } 142 143 c.Pinned = version 144 default: 145 return errors.New("unknown type for version") 146 } 147 148 return nil 149} 150 151func (c *V5VersionConfig) MarshalJSON() ([]byte, error) { 152 if c.Latest { 153 return json.Marshal(V5VersionLatest) 154 } 155 156 if c.Every { 157 return json.Marshal(V5VersionEvery) 158 } 159 160 if c.Pinned != nil { 161 return json.Marshal(c.Pinned) 162 } 163 164 return json.Marshal("") 165} 166 167type V5PlanConfig struct { 168 Do *V5PlanSequence `json:"do,omitempty"` 169 Aggregate *V5PlanSequence `json:"aggregate,omitempty"` 170 InParallel *V5InParallelConfig `json:"in_parallel,omitempty"` 171 Get string `json:"get,omitempty"` 172 Passed []string `json:"passed,omitempty"` 173 Trigger bool `json:"trigger,omitempty"` 174 Put string `json:"put,omitempty"` 175 Resource string `json:"resource,omitempty"` 176 Abort *V5PlanConfig `json:"on_abort,omitempty"` 177 Error *V5PlanConfig `json:"on_error,omitempty"` 178 Failure *V5PlanConfig `json:"on_failure,omitempty"` 179 Ensure *V5PlanConfig `json:"ensure,omitempty"` 180 Success *V5PlanConfig `json:"on_success,omitempty"` 181 Try *V5PlanConfig `json:"try,omitempty"` 182 Version *V5VersionConfig `json:"version,omitempty"` 183} 184 185func (self *migrations) Up_1574452410() error { 186 tx, err := self.DB.Begin() 187 if err != nil { 188 return err 189 } 190 191 defer tx.Rollback() 192 193 rows, err := tx.Query("SELECT pipeline_id, config, nonce FROM jobs WHERE active = true") 194 if err != nil { 195 return err 196 } 197 198 pipelineJobConfigs := make(map[int]V5JobConfigs) 199 for rows.Next() { 200 var configBlob []byte 201 var nonce sql.NullString 202 var pipelineID int 203 204 err = rows.Scan(&pipelineID, &configBlob, &nonce) 205 if err != nil { 206 return err 207 } 208 209 var noncense *string 210 if nonce.Valid { 211 noncense = &nonce.String 212 } 213 214 decrypted, err := self.Strategy.Decrypt(string(configBlob), noncense) 215 if err != nil { 216 return err 217 } 218 219 var config V5JobConfig 220 err = json.Unmarshal(decrypted, &config) 221 if err != nil { 222 return err 223 } 224 225 pipelineJobConfigs[pipelineID] = append(pipelineJobConfigs[pipelineID], config) 226 } 227 228 for pipelineID, jobConfigs := range pipelineJobConfigs { 229 resourceNameToID := make(map[string]int) 230 jobNameToID := make(map[string]int) 231 232 rows, err := tx.Query("SELECT id, name FROM resources WHERE pipeline_id = $1", pipelineID) 233 if err != nil { 234 return err 235 } 236 237 for rows.Next() { 238 var id int 239 var name string 240 241 err = rows.Scan(&id, &name) 242 if err != nil { 243 return err 244 } 245 246 resourceNameToID[name] = id 247 } 248 249 rows, err = tx.Query("SELECT id, name FROM jobs WHERE pipeline_id = $1", pipelineID) 250 if err != nil { 251 return err 252 } 253 254 for rows.Next() { 255 var id int 256 var name string 257 258 err = rows.Scan(&id, &name) 259 if err != nil { 260 return err 261 } 262 263 jobNameToID[name] = id 264 } 265 266 _, err = tx.Exec(` 267 DELETE FROM jobs_serial_groups 268 WHERE job_id in ( 269 SELECT j.id 270 FROM jobs j 271 WHERE j.pipeline_id = $1 272 )`, pipelineID) 273 if err != nil { 274 return err 275 } 276 277 for _, jobConfig := range jobConfigs { 278 if len(jobConfig.SerialGroups) != 0 { 279 for _, sg := range jobConfig.SerialGroups { 280 err = registerSerialGroup(tx, sg, jobNameToID[jobConfig.Name]) 281 if err != nil { 282 return err 283 } 284 } 285 } else { 286 if jobConfig.Serial || jobConfig.RawMaxInFlight > 0 { 287 err = registerSerialGroup(tx, jobConfig.Name, jobNameToID[jobConfig.Name]) 288 if err != nil { 289 return err 290 } 291 } 292 } 293 294 for _, plan := range jobConfig.Plans() { 295 if plan.Get != "" { 296 err = insertJobInput(tx, plan, jobConfig.Name, resourceNameToID, jobNameToID) 297 if err != nil { 298 return err 299 } 300 } else if plan.Put != "" { 301 err = insertJobOutput(tx, plan, jobConfig.Name, resourceNameToID, jobNameToID) 302 if err != nil { 303 return err 304 } 305 } 306 } 307 308 err = updateJob(tx, jobConfig, jobNameToID) 309 if err != nil { 310 return err 311 } 312 } 313 } 314 315 return tx.Commit() 316} 317 318func insertJobInput(tx *sql.Tx, plan V5PlanConfig, jobName string, resourceNameToID map[string]int, jobNameToID map[string]int) error { 319 if len(plan.Passed) != 0 { 320 for _, passedJob := range plan.Passed { 321 var resourceID int 322 if plan.Resource != "" { 323 resourceID = resourceNameToID[plan.Resource] 324 } else { 325 resourceID = resourceNameToID[plan.Get] 326 } 327 328 var version sql.NullString 329 if plan.Version != nil { 330 versionJSON, err := plan.Version.MarshalJSON() 331 if err != nil { 332 return err 333 } 334 335 version = sql.NullString{Valid: true, String: string(versionJSON)} 336 } 337 338 _, err := tx.Exec("INSERT INTO job_inputs (name, job_id, resource_id, passed_job_id, trigger, version) VALUES ($1, $2, $3, $4, $5, $6)", plan.Get, jobNameToID[jobName], resourceID, jobNameToID[passedJob], plan.Trigger, version) 339 if err != nil { 340 return err 341 } 342 } 343 } else { 344 var resourceID int 345 if plan.Resource != "" { 346 resourceID = resourceNameToID[plan.Resource] 347 } else { 348 resourceID = resourceNameToID[plan.Get] 349 } 350 351 var version sql.NullString 352 if plan.Version != nil { 353 versionJSON, err := plan.Version.MarshalJSON() 354 if err != nil { 355 return err 356 } 357 358 version = sql.NullString{Valid: true, String: string(versionJSON)} 359 } 360 361 _, err := tx.Exec("INSERT INTO job_inputs (name, job_id, resource_id, trigger, version) VALUES ($1, $2, $3, $4, $5)", plan.Get, jobNameToID[jobName], resourceID, plan.Trigger, version) 362 if err != nil { 363 return err 364 } 365 } 366 367 return nil 368} 369 370func insertJobOutput(tx *sql.Tx, plan V5PlanConfig, jobName string, resourceNameToID map[string]int, jobNameToID map[string]int) error { 371 var resourceID int 372 if plan.Resource != "" { 373 resourceID = resourceNameToID[plan.Resource] 374 } else { 375 resourceID = resourceNameToID[plan.Put] 376 } 377 378 _, err := tx.Exec("INSERT INTO job_outputs (name, job_id, resource_id) VALUES ($1, $2, $3)", plan.Put, jobNameToID[jobName], resourceID) 379 if err != nil { 380 return err 381 } 382 383 return nil 384} 385 386func updateJob(tx *sql.Tx, jobConfig V5JobConfig, jobNameToID map[string]int) error { 387 _, err := tx.Exec("UPDATE jobs SET public = $1, max_in_flight = $2, disable_manual_trigger = $3 WHERE id = $4", jobConfig.Public, jobConfig.MaxInFlight(), jobConfig.DisableManualTrigger, jobNameToID[jobConfig.Name]) 388 if err != nil { 389 return err 390 } 391 392 return nil 393} 394 395func registerSerialGroup(tx *sql.Tx, serialGroup string, jobID int) error { 396 _, err := tx.Exec("INSERT INTO jobs_serial_groups (serial_group, job_id) VALUES ($1, $2)", serialGroup, jobID) 397 return err 398} 399