1package integration_test
2
3import (
4	"archive/tar"
5	"compress/gzip"
6	"encoding/json"
7	"fmt"
8	"io/ioutil"
9	"net/http"
10	"os/exec"
11	"path/filepath"
12	"strings"
13	"time"
14
15	"github.com/concourse/concourse/atc"
16	"github.com/concourse/concourse/atc/event"
17	. "github.com/onsi/ginkgo"
18	. "github.com/onsi/gomega"
19	"github.com/onsi/gomega/gbytes"
20	"github.com/onsi/gomega/gexec"
21	"github.com/onsi/gomega/ghttp"
22	"github.com/vito/go-sse/sse"
23)
24
25var _ = Describe("Fly CLI", func() {
26	var buildDir string
27	var otherInputDir string
28
29	var streaming chan struct{}
30	var events chan atc.Event
31	var uploading chan struct{}
32	var uploadingTwo chan struct{}
33
34	var expectedPlan atc.Plan
35	var workerArtifact = atc.WorkerArtifact{
36		ID:   125,
37		Name: "some-dir",
38	}
39
40	BeforeEach(func() {
41		var err error
42
43		buildDir, err = ioutil.TempDir("", "fly-build-dir")
44		Expect(err).NotTo(HaveOccurred())
45
46		otherInputDir, err = ioutil.TempDir("", "fly-s3-asset-dir")
47		Expect(err).NotTo(HaveOccurred())
48
49		err = ioutil.WriteFile(
50			filepath.Join(buildDir, "task.yml"),
51			[]byte(`---
52platform: some-platform
53
54image_resource:
55  type: registry-image
56  source:
57    repository: ubuntu
58
59inputs:
60- name: some-input
61- name: some-other-input
62
63params:
64  FOO: bar
65  BAZ: buzz
66  X: 1
67
68run:
69  path: find
70  args: [.]
71`),
72			0644,
73		)
74		Expect(err).NotTo(HaveOccurred())
75
76		err = ioutil.WriteFile(
77			filepath.Join(otherInputDir, "s3-asset-file"),
78			[]byte(`blob`),
79			0644,
80		)
81		Expect(err).NotTo(HaveOccurred())
82
83		streaming = make(chan struct{})
84		events = make(chan atc.Event)
85
86		planFactory := atc.NewPlanFactory(0)
87
88		expectedPlan = planFactory.NewPlan(atc.DoPlan{
89			planFactory.NewPlan(atc.AggregatePlan{
90				planFactory.NewPlan(atc.ArtifactInputPlan{
91					ArtifactID: 125,
92					Name:       "some-input",
93				}),
94				planFactory.NewPlan(atc.ArtifactInputPlan{
95					ArtifactID: 125,
96					Name:       "some-other-input",
97				}),
98			}),
99			planFactory.NewPlan(atc.TaskPlan{
100				Name: "one-off",
101				Config: &atc.TaskConfig{
102					Platform: "some-platform",
103					ImageResource: &atc.ImageResource{
104						Type: "registry-image",
105						Source: atc.Source{
106							"repository": "ubuntu",
107						},
108					},
109					Inputs: []atc.TaskInputConfig{
110						{Name: "some-input"},
111						{Name: "some-other-input"},
112					},
113					Params: map[string]string{
114						"FOO": "bar",
115						"BAZ": "buzz",
116						"X":   "1",
117					},
118					Run: atc.TaskRunConfig{
119						Path: "find",
120						Args: []string{"."},
121					},
122				},
123			}),
124		})
125	})
126
127	JustBeforeEach(func() {
128		uploading = make(chan struct{})
129		uploadingTwo = make(chan struct{})
130
131		atcServer.RouteToHandler("POST", "/api/v1/teams/main/artifacts",
132			ghttp.CombineHandlers(
133				func(w http.ResponseWriter, req *http.Request) {
134					Expect(req.FormValue("platform")).To(Equal("some-platform"))
135
136					gr, err := gzip.NewReader(req.Body)
137					Expect(err).NotTo(HaveOccurred())
138
139					tr := tar.NewReader(gr)
140
141					hdr, err := tr.Next()
142					Expect(err).NotTo(HaveOccurred())
143
144					Expect(hdr.Name).To(Equal("./"))
145
146					hdr, err = tr.Next()
147					Expect(err).NotTo(HaveOccurred())
148
149					if strings.HasSuffix(hdr.Name, "task.yml") {
150						close(uploading)
151					} else if strings.HasSuffix(hdr.Name, "s3-asset-file") {
152						close(uploadingTwo)
153					}
154
155					Expect(hdr.Name).To(MatchRegexp("(./)?(task.yml|s3-asset-file)$"))
156				},
157				ghttp.RespondWith(201, `{"id":125}`),
158			),
159		)
160		atcServer.RouteToHandler("POST", "/api/v1/teams/main/builds",
161			ghttp.CombineHandlers(
162				ghttp.VerifyRequest("POST", "/api/v1/teams/main/builds"),
163				VerifyPlan(expectedPlan),
164				func(w http.ResponseWriter, r *http.Request) {
165					http.SetCookie(w, &http.Cookie{
166						Name:    "Some-Cookie",
167						Value:   "some-cookie-data",
168						Path:    "/",
169						Expires: time.Now().Add(1 * time.Minute),
170					})
171				},
172				ghttp.RespondWith(201, `{"id":128}`),
173			),
174		)
175		atcServer.RouteToHandler("GET", "/api/v1/builds/128/events",
176			ghttp.CombineHandlers(
177				ghttp.VerifyRequest("GET", "/api/v1/builds/128/events"),
178				func(w http.ResponseWriter, r *http.Request) {
179					flusher := w.(http.Flusher)
180
181					w.Header().Add("Content-Type", "text/event-stream; charset=utf-8")
182					w.Header().Add("Cache-Control", "no-cache, no-store, must-revalidate")
183					w.Header().Add("Connection", "keep-alive")
184
185					w.WriteHeader(http.StatusOK)
186
187					flusher.Flush()
188
189					close(streaming)
190
191					id := 0
192
193					for e := range events {
194						payload, err := json.Marshal(event.Message{Event: e})
195						Expect(err).NotTo(HaveOccurred())
196
197						event := sse.Event{
198							ID:   fmt.Sprintf("%d", id),
199							Name: "event",
200							Data: payload,
201						}
202
203						err = event.Write(w)
204						Expect(err).NotTo(HaveOccurred())
205
206						flusher.Flush()
207
208						id++
209					}
210
211					err := sse.Event{
212						Name: "end",
213					}.Write(w)
214					Expect(err).NotTo(HaveOccurred())
215				},
216			),
217		)
218		atcServer.RouteToHandler("GET", "/api/v1/builds/128/artifacts",
219			ghttp.RespondWithJSONEncoded(200, []atc.WorkerArtifact{workerArtifact}),
220		)
221
222	})
223
224	It("flies with multiple passengers", func() {
225		flyCmd := exec.Command(
226			flyPath, "-t", targetName, "e",
227			"--input", fmt.Sprintf("some-input=%s", buildDir),
228			"--input", fmt.Sprintf("some-other-input=%s", otherInputDir),
229			"--config", filepath.Join(buildDir, "task.yml"),
230		)
231
232		sess, err := gexec.Start(flyCmd, GinkgoWriter, GinkgoWriter)
233		Expect(err).NotTo(HaveOccurred())
234
235		Eventually(streaming).Should(BeClosed())
236		Eventually(uploading).Should(BeClosed())
237		Eventually(uploadingTwo).Should(BeClosed())
238
239		events <- event.Log{Payload: "sup"}
240		close(events)
241
242		Eventually(sess.Out).Should(gbytes.Say("sup"))
243
244		<-sess.Exited
245		Expect(sess).To(gexec.Exit(0))
246	})
247})
248