1package integration_test
2
3import (
4	"archive/tar"
5	"compress/gzip"
6	"encoding/json"
7	"fmt"
8	"io/ioutil"
9	"net/http"
10	"os/exec"
11	"path/filepath"
12	"time"
13
14	"github.com/concourse/concourse/atc"
15	"github.com/concourse/concourse/atc/event"
16	. "github.com/onsi/ginkgo"
17	. "github.com/onsi/gomega"
18	"github.com/onsi/gomega/gbytes"
19	"github.com/onsi/gomega/gexec"
20	"github.com/onsi/gomega/ghttp"
21	"github.com/vito/go-sse/sse"
22)
23
24var _ = Describe("Fly CLI", func() {
25	var buildDir string
26	var otherInputDir string
27
28	var streaming chan struct{}
29	var events chan atc.Event
30	var uploading chan struct{}
31
32	var expectedPlan atc.Plan
33	var workerArtifact = atc.WorkerArtifact{
34		ID:   125,
35		Name: "some-dir",
36	}
37
38	BeforeEach(func() {
39		var err error
40
41		buildDir, err = ioutil.TempDir("", "fly-build-dir")
42		Expect(err).NotTo(HaveOccurred())
43
44		otherInputDir, err = ioutil.TempDir("", "fly-s3-asset-dir")
45		Expect(err).NotTo(HaveOccurred())
46
47		err = ioutil.WriteFile(
48			filepath.Join(buildDir, "task.yml"),
49			[]byte(`---
50platform: some-platform
51
52image_resource:
53  type: registry-image
54  source:
55    repository: ubuntu
56
57inputs:
58- name: some-input
59- name: some-other-input
60
61params:
62  FOO: bar
63  BAZ: buzz
64  X: 1
65
66run:
67  path: find
68  args: [.]
69`),
70			0644,
71		)
72		Expect(err).NotTo(HaveOccurred())
73
74		err = ioutil.WriteFile(
75			filepath.Join(otherInputDir, "s3-asset-file"),
76			[]byte(`blob`),
77			0644,
78		)
79		Expect(err).NotTo(HaveOccurred())
80
81		streaming = make(chan struct{})
82		events = make(chan atc.Event)
83
84		planFactory := atc.NewPlanFactory(0)
85
86		expectedPlan = planFactory.NewPlan(atc.DoPlan{
87			planFactory.NewPlan(atc.AggregatePlan{
88				planFactory.NewPlan(atc.ArtifactInputPlan{
89					ArtifactID: 125,
90					Name:       "some-input",
91				}),
92				planFactory.NewPlan(atc.GetPlan{
93					Name:    "some-other-input",
94					Type:    "git",
95					Source:  atc.Source{"uri": "https://example.com"},
96					Params:  atc.Params{"some": "other-params"},
97					Version: &atc.Version{"some": "other-version"},
98					Tags:    atc.Tags{"tag-1", "tag-2"},
99				}),
100			}),
101			planFactory.NewPlan(atc.TaskPlan{
102				Name: "one-off",
103				Config: &atc.TaskConfig{
104					Platform: "some-platform",
105					ImageResource: &atc.ImageResource{
106						Type: "registry-image",
107						Source: atc.Source{
108							"repository": "ubuntu",
109						},
110					},
111					Inputs: []atc.TaskInputConfig{
112						{Name: "some-input"},
113						{Name: "some-other-input"},
114					},
115					Params: map[string]string{
116						"FOO": "bar",
117						"BAZ": "buzz",
118						"X":   "1",
119					},
120					Run: atc.TaskRunConfig{
121						Path: "find",
122						Args: []string{"."},
123					},
124				},
125			}),
126		})
127	})
128
129	JustBeforeEach(func() {
130		uploading = make(chan struct{})
131		atcServer.RouteToHandler("GET", "/api/v1/teams/main/pipelines/some-pipeline/jobs/some-job/inputs",
132			ghttp.CombineHandlers(
133				ghttp.VerifyRequest("GET", "/api/v1/teams/main/pipelines/some-pipeline/jobs/some-job/inputs"),
134				ghttp.RespondWithJSONEncoded(http.StatusOK, []atc.BuildInput{
135					{
136						Name:     "some-input",
137						Type:     "git",
138						Resource: "some-resource",
139						Source:   atc.Source{"uri": "https://internet.com"},
140						Params:   atc.Params{"some": "params"},
141						Version:  atc.Version{"some": "version"},
142						Tags:     atc.Tags{"tag-1", "tag-2"},
143					},
144					{
145						Name:     "some-other-input",
146						Type:     "git",
147						Resource: "some-other-resource",
148						Source:   atc.Source{"uri": "https://example.com"},
149						Params:   atc.Params{"some": "other-params"},
150						Version:  atc.Version{"some": "other-version"},
151						Tags:     atc.Tags{"tag-1", "tag-2"},
152					},
153				}),
154			),
155		)
156		atcServer.RouteToHandler("GET", "/api/v1/teams/main/pipelines/some-pipeline/resource-types",
157			ghttp.CombineHandlers(
158				ghttp.VerifyRequest("GET", "/api/v1/teams/main/pipelines/some-pipeline/resource-types"),
159				ghttp.RespondWithJSONEncoded(http.StatusOK, nil),
160			),
161		)
162		atcServer.RouteToHandler("POST", "/api/v1/teams/main/artifacts",
163			ghttp.CombineHandlers(
164				func(w http.ResponseWriter, req *http.Request) {
165					close(uploading)
166
167					Expect(req.FormValue("platform")).To(Equal("some-platform"))
168
169					gr, err := gzip.NewReader(req.Body)
170					Expect(err).NotTo(HaveOccurred())
171
172					tr := tar.NewReader(gr)
173
174					hdr, err := tr.Next()
175					Expect(err).NotTo(HaveOccurred())
176
177					Expect(hdr.Name).To(Equal("./"))
178
179					hdr, err = tr.Next()
180					Expect(err).NotTo(HaveOccurred())
181
182					Expect(hdr.Name).To(MatchRegexp("(./)?task.yml$"))
183				},
184				ghttp.RespondWithJSONEncoded(201, workerArtifact),
185			),
186		)
187		atcServer.RouteToHandler("POST", "/api/v1/teams/main/pipelines/some-pipeline/builds",
188			ghttp.CombineHandlers(
189				ghttp.VerifyRequest("POST", "/api/v1/teams/main/pipelines/some-pipeline/builds"),
190				VerifyPlan(expectedPlan),
191				func(w http.ResponseWriter, r *http.Request) {
192					http.SetCookie(w, &http.Cookie{
193						Name:    "Some-Cookie",
194						Value:   "some-cookie-data",
195						Path:    "/",
196						Expires: time.Now().Add(1 * time.Minute),
197					})
198				},
199				ghttp.RespondWith(201, `{"id":128}`),
200			),
201		)
202		atcServer.RouteToHandler("GET", "/api/v1/builds/128/events",
203			ghttp.CombineHandlers(
204				ghttp.VerifyRequest("GET", "/api/v1/builds/128/events"),
205				func(w http.ResponseWriter, r *http.Request) {
206					flusher := w.(http.Flusher)
207
208					w.Header().Add("Content-Type", "text/event-stream; charset=utf-8")
209					w.Header().Add("Cache-Control", "no-cache, no-store, must-revalidate")
210					w.Header().Add("Connection", "keep-alive")
211
212					w.WriteHeader(http.StatusOK)
213
214					flusher.Flush()
215
216					close(streaming)
217
218					id := 0
219
220					for e := range events {
221						payload, err := json.Marshal(event.Message{Event: e})
222						Expect(err).NotTo(HaveOccurred())
223
224						event := sse.Event{
225							ID:   fmt.Sprintf("%d", id),
226							Name: "event",
227							Data: payload,
228						}
229
230						err = event.Write(w)
231						Expect(err).NotTo(HaveOccurred())
232
233						flusher.Flush()
234
235						id++
236					}
237
238					err := sse.Event{
239						Name: "end",
240					}.Write(w)
241					Expect(err).NotTo(HaveOccurred())
242				},
243			),
244		)
245		atcServer.RouteToHandler("GET", "/api/v1/builds/128/artifacts",
246			ghttp.RespondWithJSONEncoded(200, []atc.WorkerArtifact{workerArtifact}),
247		)
248
249	})
250
251	It("can base inputs on a job in the pipeline", func() {
252		flyCmd := exec.Command(
253			flyPath, "-t", targetName, "e",
254			"--inputs-from", "some-pipeline/some-job",
255			"--input", fmt.Sprintf("some-input=%s", buildDir),
256			"--config", filepath.Join(buildDir, "task.yml"),
257		)
258
259		sess, err := gexec.Start(flyCmd, GinkgoWriter, GinkgoWriter)
260		Expect(err).NotTo(HaveOccurred())
261
262		Eventually(streaming).Should(BeClosed())
263		Eventually(uploading).Should(BeClosed())
264
265		events <- event.Log{Payload: "sup"}
266		close(events)
267
268		Eventually(sess.Out).Should(gbytes.Say("sup"))
269
270		<-sess.Exited
271		Expect(sess).To(gexec.Exit(0))
272	})
273})
274