1// +build integration
2
3package s3_test
4
5import (
6	"bytes"
7	"encoding/csv"
8	"io"
9	"strings"
10	"testing"
11
12	"github.com/aws/aws-sdk-go/aws"
13	"github.com/aws/aws-sdk-go/aws/awserr"
14	"github.com/aws/aws-sdk-go/internal/sdkio"
15	"github.com/aws/aws-sdk-go/service/s3"
16)
17
18func TestInteg_SelectObjectContent(t *testing.T) {
19	keyName := "selectObject.csv"
20
21	var header = []byte("A,B,C,D,E,F,G,H,I,J\n")
22	var recordRow = []byte("0,0,0.5,217.371,217.658,218.002,269.445,487.447,2.106,489.554\n")
23
24	buf := make([]byte, 0, 6*sdkio.MebiByte)
25	buf = append(buf, []byte(header)...)
26	for i := 0; i < (cap(buf)/len(recordRow))-1; i++ {
27		buf = append(buf, recordRow...)
28	}
29
30	// Put a mock CSV file to the S3 bucket so that its contents can be
31	// selected.
32	putTestContent(t, bytes.NewReader(buf), keyName)
33
34	resp, err := s3Svc.SelectObjectContent(&s3.SelectObjectContentInput{
35		Bucket:         &integMetadata.Buckets.Source.Name,
36		Key:            &keyName,
37		Expression:     aws.String("Select * from S3Object"),
38		ExpressionType: aws.String(s3.ExpressionTypeSql),
39		InputSerialization: &s3.InputSerialization{
40			CSV: &s3.CSVInput{
41				FieldDelimiter: aws.String(","),
42				FileHeaderInfo: aws.String(s3.FileHeaderInfoIgnore),
43			},
44		},
45		OutputSerialization: &s3.OutputSerialization{
46			CSV: &s3.CSVOutput{
47				FieldDelimiter: aws.String(","),
48			},
49		},
50	})
51	if err != nil {
52		t.Fatalf("expect no error, %v", err)
53	}
54	defer resp.EventStream.Close()
55
56	recReader, recWriter := io.Pipe()
57
58	var sum int64
59	var processed int64
60
61	var gotEndEvent bool
62	go func(w *io.PipeWriter, resp *s3.SelectObjectContentOutput) {
63		defer recWriter.Close()
64		var numRecordEvents int64
65		for event := range resp.EventStream.Events() {
66			switch tv := event.(type) {
67			case *s3.RecordsEvent:
68				n, err := recWriter.Write(tv.Payload)
69				if err != nil {
70					t.Logf("failed to write to record writer, %v, %v", n, err)
71				}
72				sum += int64(n)
73				numRecordEvents++
74			case *s3.StatsEvent:
75				processed = *tv.Details.BytesProcessed
76			case *s3.EndEvent:
77				gotEndEvent = true
78				t.Logf("s3.EndEvent received")
79			}
80		}
81		t.Logf("received %d record events", numRecordEvents)
82	}(recWriter, resp)
83
84	type Record []string
85
86	records := make(chan []Record)
87	go func(r io.Reader, records chan<- []Record, batchSize int) {
88		defer close(records)
89
90		csvReader := csv.NewReader(r)
91		var count int64
92
93		batch := make([]Record, 0, batchSize)
94		for {
95			count++
96			record, err := csvReader.Read()
97			if err != nil {
98				if _, ok := err.(*csv.ParseError); ok {
99					t.Logf("failed to decode record row, %v, %v", count, err)
100					continue
101				}
102				if err != io.EOF {
103					t.Logf("csv decode failed, %v", err)
104				}
105				err = nil
106				break
107			}
108			batch = append(batch, record)
109			if len(batch) >= batchSize {
110				records <- batch
111				batch = batch[0:0]
112			}
113		}
114		if len(batch) != 0 {
115			records <- batch
116		}
117	}(recReader, records, 10)
118
119	var count int64
120	for batch := range records {
121		// To simulate processing of a batch, add sleep delay.
122		count += int64(len(batch))
123
124		if err := resp.EventStream.Err(); err != nil {
125			t.Errorf("exect no error, got %v", err)
126		}
127	}
128
129	if !gotEndEvent {
130		t.Errorf("expected EndEvent, did not receive")
131	}
132
133	if e, a := int64(101474), count; e != a {
134		t.Errorf("expect %d records, got %d", e, a)
135	}
136
137	if sum == 0 {
138		t.Errorf("expect selected content, got none")
139	}
140
141	if processed == 0 {
142		t.Errorf("expect selected status bytes processed, got none")
143	}
144
145	if err := resp.EventStream.Err(); err != nil {
146		t.Fatalf("expect no error, got %v", err)
147	}
148}
149
150func TestInteg_SelectObjectContent_Error(t *testing.T) {
151	keyName := "negativeSelect.csv"
152
153	buf := make([]byte, 0, 6*sdkio.MebiByte)
154	buf = append(buf, []byte("name,number\n")...)
155	line := []byte("jj,0\n")
156	for i := 0; i < (cap(buf)/len(line))-2; i++ {
157		buf = append(buf, line...)
158	}
159	buf = append(buf, []byte("gg,NaN\n")...)
160
161	putTestContent(t, bytes.NewReader(buf), keyName)
162
163	resp, err := s3Svc.SelectObjectContent(&s3.SelectObjectContentInput{
164		Bucket:         &integMetadata.Buckets.Source.Name,
165		Key:            &keyName,
166		Expression:     aws.String("SELECT name FROM S3Object WHERE cast(number as int) < 1"),
167		ExpressionType: aws.String(s3.ExpressionTypeSql),
168		InputSerialization: &s3.InputSerialization{
169			CSV: &s3.CSVInput{
170				FileHeaderInfo: aws.String(s3.FileHeaderInfoUse),
171			},
172		},
173		OutputSerialization: &s3.OutputSerialization{
174			CSV: &s3.CSVOutput{
175				FieldDelimiter: aws.String(","),
176			},
177		},
178	})
179	if err != nil {
180		t.Fatalf("expect no error, %v", err)
181	}
182	defer resp.EventStream.Close()
183
184	var sum int64
185	for event := range resp.EventStream.Events() {
186		switch tv := event.(type) {
187		case *s3.RecordsEvent:
188			sum += int64(len(tv.Payload))
189		}
190	}
191
192	if sum == 0 {
193		t.Errorf("expect selected content")
194	}
195
196	err = resp.EventStream.Err()
197	if err == nil {
198		t.Fatalf("exepct error")
199	}
200
201	aerr := err.(awserr.Error)
202	if a := aerr.Code(); len(a) == 0 {
203		t.Errorf("expect, error code")
204	}
205	if a := aerr.Message(); len(a) == 0 {
206		t.Errorf("expect, error message")
207	}
208}
209
210func TestInteg_SelectObjectContent_Stream(t *testing.T) {
211	keyName := "selectGopher.csv"
212
213	buf := `name,number
214gopher,0
215odɥǝɹ,1
216`
217	// Put a mock CSV file to the S3 bucket so that its contents can be
218	// selected.
219	putTestContent(t, strings.NewReader(buf), keyName)
220
221	// Make the Select Object Content API request using the object uploaded.
222	resp, err := s3Svc.SelectObjectContent(&s3.SelectObjectContentInput{
223		Bucket:         &integMetadata.Buckets.Source.Name,
224		Key:            &keyName,
225		Expression:     aws.String("SELECT name FROM S3Object WHERE cast(number as int) < 1"),
226		ExpressionType: aws.String(s3.ExpressionTypeSql),
227		InputSerialization: &s3.InputSerialization{
228			CSV: &s3.CSVInput{
229				FileHeaderInfo: aws.String(s3.FileHeaderInfoUse),
230			},
231		},
232		OutputSerialization: &s3.OutputSerialization{
233			CSV: &s3.CSVOutput{},
234		},
235	})
236	if err != nil {
237		t.Fatalf("failed making API request, %v\n", err)
238	}
239	defer resp.EventStream.Close()
240
241	results, resultWriter := io.Pipe()
242	go func() {
243		defer resultWriter.Close()
244		for event := range resp.EventStream.Events() {
245			switch e := event.(type) {
246			case *s3.RecordsEvent:
247				resultWriter.Write(e.Payload)
248			case *s3.StatsEvent:
249				t.Logf("Processed %d bytes\n", *e.Details.BytesProcessed)
250			}
251		}
252	}()
253
254	// Printout the results
255	resReader := csv.NewReader(results)
256	for {
257		record, err := resReader.Read()
258		if err == io.EOF {
259			break
260		}
261		t.Log(record)
262	}
263
264	if err := resp.EventStream.Err(); err != nil {
265		t.Fatalf("reading from event stream failed, %v\n", err)
266	}
267}
268