1package migrations
2
3import (
4	"database/sql"
5	"encoding/json"
6	"errors"
7	"fmt"
8)
9
10type V5JobConfigs []V5JobConfig
11
12type V5JobConfig struct {
13	Name                 string   `json:"name"`
14	Public               bool     `json:"public,omitempty"`
15	DisableManualTrigger bool     `json:"disable_manual_trigger,omitempty"`
16	Serial               bool     `json:"serial,omitempty"`
17	SerialGroups         []string `json:"serial_groups,omitempty"`
18	RawMaxInFlight       int      `json:"max_in_flight,omitempty"`
19
20	Abort   *V5PlanConfig `json:"on_abort,omitempty"`
21	Error   *V5PlanConfig `json:"on_error,omitempty"`
22	Failure *V5PlanConfig `json:"on_failure,omitempty"`
23	Ensure  *V5PlanConfig `json:"ensure,omitempty"`
24	Success *V5PlanConfig `json:"on_success,omitempty"`
25
26	Plan V5PlanSequence `json:"plan"`
27}
28
29type V5PlanSequence []V5PlanConfig
30
31func (config V5JobConfig) Plans() []V5PlanConfig {
32	plan := v5collectPlans(V5PlanConfig{
33		Do:      &config.Plan,
34		Abort:   config.Abort,
35		Error:   config.Error,
36		Ensure:  config.Ensure,
37		Failure: config.Failure,
38		Success: config.Success,
39	})
40
41	return plan
42}
43
44func (config V5JobConfig) MaxInFlight() int {
45	if config.Serial || len(config.SerialGroups) > 0 {
46		return 1
47	}
48
49	if config.RawMaxInFlight != 0 {
50		return config.RawMaxInFlight
51	}
52
53	return 0
54}
55
56func v5collectPlans(plan V5PlanConfig) []V5PlanConfig {
57	var plans []V5PlanConfig
58
59	if plan.Abort != nil {
60		plans = append(plans, v5collectPlans(*plan.Abort)...)
61	}
62
63	if plan.Error != nil {
64		plans = append(plans, v5collectPlans(*plan.Error)...)
65	}
66
67	if plan.Success != nil {
68		plans = append(plans, v5collectPlans(*plan.Success)...)
69	}
70
71	if plan.Failure != nil {
72		plans = append(plans, v5collectPlans(*plan.Failure)...)
73	}
74
75	if plan.Ensure != nil {
76		plans = append(plans, v5collectPlans(*plan.Ensure)...)
77	}
78
79	if plan.Try != nil {
80		plans = append(plans, v5collectPlans(*plan.Try)...)
81	}
82
83	if plan.Do != nil {
84		for _, p := range *plan.Do {
85			plans = append(plans, v5collectPlans(p)...)
86		}
87	}
88
89	if plan.Aggregate != nil {
90		for _, p := range *plan.Aggregate {
91			plans = append(plans, v5collectPlans(p)...)
92		}
93	}
94
95	if plan.InParallel != nil {
96		for _, p := range plan.InParallel.Steps {
97			plans = append(plans, v5collectPlans(p)...)
98		}
99	}
100
101	return append(plans, plan)
102}
103
104type V5InParallelConfig struct {
105	Steps    V5PlanSequence `json:"steps,omitempty"`
106	Limit    int            `json:"limit,omitempty"`
107	FailFast bool           `json:"fail_fast,omitempty"`
108}
109
110type V5VersionConfig struct {
111	Every  bool
112	Latest bool
113	Pinned Version
114}
115
116const V5VersionLatest = "latest"
117const V5VersionEvery = "every"
118
119func (c *V5VersionConfig) UnmarshalJSON(version []byte) error {
120	var data interface{}
121
122	err := json.Unmarshal(version, &data)
123	if err != nil {
124		return err
125	}
126
127	switch actual := data.(type) {
128	case string:
129		c.Every = actual == "every"
130		c.Latest = actual == "latest"
131	case map[string]interface{}:
132		version := Version{}
133
134		for k, v := range actual {
135			if s, ok := v.(string); ok {
136				version[k] = s
137				continue
138			}
139
140			return fmt.Errorf("the value %v of %s is not a string", v, k)
141		}
142
143		c.Pinned = version
144	default:
145		return errors.New("unknown type for version")
146	}
147
148	return nil
149}
150
151func (c *V5VersionConfig) MarshalJSON() ([]byte, error) {
152	if c.Latest {
153		return json.Marshal(V5VersionLatest)
154	}
155
156	if c.Every {
157		return json.Marshal(V5VersionEvery)
158	}
159
160	if c.Pinned != nil {
161		return json.Marshal(c.Pinned)
162	}
163
164	return json.Marshal("")
165}
166
167type V5PlanConfig struct {
168	Do         *V5PlanSequence     `json:"do,omitempty"`
169	Aggregate  *V5PlanSequence     `json:"aggregate,omitempty"`
170	InParallel *V5InParallelConfig `json:"in_parallel,omitempty"`
171	Get        string              `json:"get,omitempty"`
172	Passed     []string            `json:"passed,omitempty"`
173	Trigger    bool                `json:"trigger,omitempty"`
174	Put        string              `json:"put,omitempty"`
175	Resource   string              `json:"resource,omitempty"`
176	Abort      *V5PlanConfig       `json:"on_abort,omitempty"`
177	Error      *V5PlanConfig       `json:"on_error,omitempty"`
178	Failure    *V5PlanConfig       `json:"on_failure,omitempty"`
179	Ensure     *V5PlanConfig       `json:"ensure,omitempty"`
180	Success    *V5PlanConfig       `json:"on_success,omitempty"`
181	Try        *V5PlanConfig       `json:"try,omitempty"`
182	Version    *V5VersionConfig    `json:"version,omitempty"`
183}
184
185func (self *migrations) Up_1574452410() error {
186	tx, err := self.DB.Begin()
187	if err != nil {
188		return err
189	}
190
191	defer tx.Rollback()
192
193	rows, err := tx.Query("SELECT pipeline_id, config, nonce FROM jobs WHERE active = true")
194	if err != nil {
195		return err
196	}
197
198	pipelineJobConfigs := make(map[int]V5JobConfigs)
199	for rows.Next() {
200		var configBlob []byte
201		var nonce sql.NullString
202		var pipelineID int
203
204		err = rows.Scan(&pipelineID, &configBlob, &nonce)
205		if err != nil {
206			return err
207		}
208
209		var noncense *string
210		if nonce.Valid {
211			noncense = &nonce.String
212		}
213
214		decrypted, err := self.Strategy.Decrypt(string(configBlob), noncense)
215		if err != nil {
216			return err
217		}
218
219		var config V5JobConfig
220		err = json.Unmarshal(decrypted, &config)
221		if err != nil {
222			return err
223		}
224
225		pipelineJobConfigs[pipelineID] = append(pipelineJobConfigs[pipelineID], config)
226	}
227
228	for pipelineID, jobConfigs := range pipelineJobConfigs {
229		resourceNameToID := make(map[string]int)
230		jobNameToID := make(map[string]int)
231
232		rows, err := tx.Query("SELECT id, name FROM resources WHERE pipeline_id = $1", pipelineID)
233		if err != nil {
234			return err
235		}
236
237		for rows.Next() {
238			var id int
239			var name string
240
241			err = rows.Scan(&id, &name)
242			if err != nil {
243				return err
244			}
245
246			resourceNameToID[name] = id
247		}
248
249		rows, err = tx.Query("SELECT id, name FROM jobs WHERE pipeline_id = $1", pipelineID)
250		if err != nil {
251			return err
252		}
253
254		for rows.Next() {
255			var id int
256			var name string
257
258			err = rows.Scan(&id, &name)
259			if err != nil {
260				return err
261			}
262
263			jobNameToID[name] = id
264		}
265
266		_, err = tx.Exec(`
267			DELETE FROM jobs_serial_groups
268			WHERE job_id in (
269				SELECT j.id
270				FROM jobs j
271				WHERE j.pipeline_id = $1
272			)`, pipelineID)
273		if err != nil {
274			return err
275		}
276
277		for _, jobConfig := range jobConfigs {
278			if len(jobConfig.SerialGroups) != 0 {
279				for _, sg := range jobConfig.SerialGroups {
280					err = registerSerialGroup(tx, sg, jobNameToID[jobConfig.Name])
281					if err != nil {
282						return err
283					}
284				}
285			} else {
286				if jobConfig.Serial || jobConfig.RawMaxInFlight > 0 {
287					err = registerSerialGroup(tx, jobConfig.Name, jobNameToID[jobConfig.Name])
288					if err != nil {
289						return err
290					}
291				}
292			}
293
294			for _, plan := range jobConfig.Plans() {
295				if plan.Get != "" {
296					err = insertJobInput(tx, plan, jobConfig.Name, resourceNameToID, jobNameToID)
297					if err != nil {
298						return err
299					}
300				} else if plan.Put != "" {
301					err = insertJobOutput(tx, plan, jobConfig.Name, resourceNameToID, jobNameToID)
302					if err != nil {
303						return err
304					}
305				}
306			}
307
308			err = updateJob(tx, jobConfig, jobNameToID)
309			if err != nil {
310				return err
311			}
312		}
313	}
314
315	return tx.Commit()
316}
317
318func insertJobInput(tx *sql.Tx, plan V5PlanConfig, jobName string, resourceNameToID map[string]int, jobNameToID map[string]int) error {
319	if len(plan.Passed) != 0 {
320		for _, passedJob := range plan.Passed {
321			var resourceID int
322			if plan.Resource != "" {
323				resourceID = resourceNameToID[plan.Resource]
324			} else {
325				resourceID = resourceNameToID[plan.Get]
326			}
327
328			var version sql.NullString
329			if plan.Version != nil {
330				versionJSON, err := plan.Version.MarshalJSON()
331				if err != nil {
332					return err
333				}
334
335				version = sql.NullString{Valid: true, String: string(versionJSON)}
336			}
337
338			_, err := tx.Exec("INSERT INTO job_inputs (name, job_id, resource_id, passed_job_id, trigger, version) VALUES ($1, $2, $3, $4, $5, $6)", plan.Get, jobNameToID[jobName], resourceID, jobNameToID[passedJob], plan.Trigger, version)
339			if err != nil {
340				return err
341			}
342		}
343	} else {
344		var resourceID int
345		if plan.Resource != "" {
346			resourceID = resourceNameToID[plan.Resource]
347		} else {
348			resourceID = resourceNameToID[plan.Get]
349		}
350
351		var version sql.NullString
352		if plan.Version != nil {
353			versionJSON, err := plan.Version.MarshalJSON()
354			if err != nil {
355				return err
356			}
357
358			version = sql.NullString{Valid: true, String: string(versionJSON)}
359		}
360
361		_, err := tx.Exec("INSERT INTO job_inputs (name, job_id, resource_id, trigger, version) VALUES ($1, $2, $3, $4, $5)", plan.Get, jobNameToID[jobName], resourceID, plan.Trigger, version)
362		if err != nil {
363			return err
364		}
365	}
366
367	return nil
368}
369
370func insertJobOutput(tx *sql.Tx, plan V5PlanConfig, jobName string, resourceNameToID map[string]int, jobNameToID map[string]int) error {
371	var resourceID int
372	if plan.Resource != "" {
373		resourceID = resourceNameToID[plan.Resource]
374	} else {
375		resourceID = resourceNameToID[plan.Put]
376	}
377
378	_, err := tx.Exec("INSERT INTO job_outputs (name, job_id, resource_id) VALUES ($1, $2, $3)", plan.Put, jobNameToID[jobName], resourceID)
379	if err != nil {
380		return err
381	}
382
383	return nil
384}
385
386func updateJob(tx *sql.Tx, jobConfig V5JobConfig, jobNameToID map[string]int) error {
387	_, err := tx.Exec("UPDATE jobs SET public = $1, max_in_flight = $2, disable_manual_trigger = $3 WHERE id = $4", jobConfig.Public, jobConfig.MaxInFlight(), jobConfig.DisableManualTrigger, jobNameToID[jobConfig.Name])
388	if err != nil {
389		return err
390	}
391
392	return nil
393}
394
395func registerSerialGroup(tx *sql.Tx, serialGroup string, jobID int) error {
396	_, err := tx.Exec("INSERT INTO jobs_serial_groups (serial_group, job_id) VALUES ($1, $2)", serialGroup, jobID)
397	return err
398}
399