1// +build integration 2 3package s3_test 4 5import ( 6 "bytes" 7 "encoding/csv" 8 "io" 9 "strings" 10 "testing" 11 12 "github.com/aws/aws-sdk-go/aws" 13 "github.com/aws/aws-sdk-go/aws/awserr" 14 "github.com/aws/aws-sdk-go/internal/sdkio" 15 "github.com/aws/aws-sdk-go/service/s3" 16) 17 18func TestInteg_SelectObjectContent(t *testing.T) { 19 keyName := "selectObject.csv" 20 21 var header = []byte("A,B,C,D,E,F,G,H,I,J\n") 22 var recordRow = []byte("0,0,0.5,217.371,217.658,218.002,269.445,487.447,2.106,489.554\n") 23 24 buf := make([]byte, 0, 6*sdkio.MebiByte) 25 buf = append(buf, []byte(header)...) 26 for i := 0; i < (cap(buf)/len(recordRow))-1; i++ { 27 buf = append(buf, recordRow...) 28 } 29 30 // Put a mock CSV file to the S3 bucket so that its contents can be 31 // selected. 32 putTestContent(t, bytes.NewReader(buf), keyName) 33 34 resp, err := s3Svc.SelectObjectContent(&s3.SelectObjectContentInput{ 35 Bucket: &integMetadata.Buckets.Source.Name, 36 Key: &keyName, 37 Expression: aws.String("Select * from S3Object"), 38 ExpressionType: aws.String(s3.ExpressionTypeSql), 39 InputSerialization: &s3.InputSerialization{ 40 CSV: &s3.CSVInput{ 41 FieldDelimiter: aws.String(","), 42 FileHeaderInfo: aws.String(s3.FileHeaderInfoIgnore), 43 }, 44 }, 45 OutputSerialization: &s3.OutputSerialization{ 46 CSV: &s3.CSVOutput{ 47 FieldDelimiter: aws.String(","), 48 }, 49 }, 50 }) 51 if err != nil { 52 t.Fatalf("expect no error, %v", err) 53 } 54 defer resp.EventStream.Close() 55 56 recReader, recWriter := io.Pipe() 57 58 var sum int64 59 var processed int64 60 61 var gotEndEvent bool 62 go func(w *io.PipeWriter, resp *s3.SelectObjectContentOutput) { 63 defer recWriter.Close() 64 var numRecordEvents int64 65 for event := range resp.EventStream.Events() { 66 switch tv := event.(type) { 67 case *s3.RecordsEvent: 68 n, err := recWriter.Write(tv.Payload) 69 if err != nil { 70 t.Logf("failed to write to record writer, %v, %v", n, err) 71 } 72 sum += int64(n) 73 numRecordEvents++ 74 case *s3.StatsEvent: 75 processed = *tv.Details.BytesProcessed 76 case *s3.EndEvent: 77 gotEndEvent = true 78 t.Logf("s3.EndEvent received") 79 } 80 } 81 t.Logf("received %d record events", numRecordEvents) 82 }(recWriter, resp) 83 84 type Record []string 85 86 records := make(chan []Record) 87 go func(r io.Reader, records chan<- []Record, batchSize int) { 88 defer close(records) 89 90 csvReader := csv.NewReader(r) 91 var count int64 92 93 batch := make([]Record, 0, batchSize) 94 for { 95 count++ 96 record, err := csvReader.Read() 97 if err != nil { 98 if _, ok := err.(*csv.ParseError); ok { 99 t.Logf("failed to decode record row, %v, %v", count, err) 100 continue 101 } 102 if err != io.EOF { 103 t.Logf("csv decode failed, %v", err) 104 } 105 err = nil 106 break 107 } 108 batch = append(batch, record) 109 if len(batch) >= batchSize { 110 records <- batch 111 batch = batch[0:0] 112 } 113 } 114 if len(batch) != 0 { 115 records <- batch 116 } 117 }(recReader, records, 10) 118 119 var count int64 120 for batch := range records { 121 // To simulate processing of a batch, add sleep delay. 122 count += int64(len(batch)) 123 124 if err := resp.EventStream.Err(); err != nil { 125 t.Errorf("exect no error, got %v", err) 126 } 127 } 128 129 if !gotEndEvent { 130 t.Errorf("expected EndEvent, did not receive") 131 } 132 133 if e, a := int64(101474), count; e != a { 134 t.Errorf("expect %d records, got %d", e, a) 135 } 136 137 if sum == 0 { 138 t.Errorf("expect selected content, got none") 139 } 140 141 if processed == 0 { 142 t.Errorf("expect selected status bytes processed, got none") 143 } 144 145 if err := resp.EventStream.Err(); err != nil { 146 t.Fatalf("expect no error, got %v", err) 147 } 148} 149 150func TestInteg_SelectObjectContent_Error(t *testing.T) { 151 keyName := "negativeSelect.csv" 152 153 buf := make([]byte, 0, 6*sdkio.MebiByte) 154 buf = append(buf, []byte("name,number\n")...) 155 line := []byte("jj,0\n") 156 for i := 0; i < (cap(buf)/len(line))-2; i++ { 157 buf = append(buf, line...) 158 } 159 buf = append(buf, []byte("gg,NaN\n")...) 160 161 putTestContent(t, bytes.NewReader(buf), keyName) 162 163 resp, err := s3Svc.SelectObjectContent(&s3.SelectObjectContentInput{ 164 Bucket: &integMetadata.Buckets.Source.Name, 165 Key: &keyName, 166 Expression: aws.String("SELECT name FROM S3Object WHERE cast(number as int) < 1"), 167 ExpressionType: aws.String(s3.ExpressionTypeSql), 168 InputSerialization: &s3.InputSerialization{ 169 CSV: &s3.CSVInput{ 170 FileHeaderInfo: aws.String(s3.FileHeaderInfoUse), 171 }, 172 }, 173 OutputSerialization: &s3.OutputSerialization{ 174 CSV: &s3.CSVOutput{ 175 FieldDelimiter: aws.String(","), 176 }, 177 }, 178 }) 179 if err != nil { 180 t.Fatalf("expect no error, %v", err) 181 } 182 defer resp.EventStream.Close() 183 184 var sum int64 185 for event := range resp.EventStream.Events() { 186 switch tv := event.(type) { 187 case *s3.RecordsEvent: 188 sum += int64(len(tv.Payload)) 189 } 190 } 191 192 if sum == 0 { 193 t.Errorf("expect selected content") 194 } 195 196 err = resp.EventStream.Err() 197 if err == nil { 198 t.Fatalf("exepct error") 199 } 200 201 aerr := err.(awserr.Error) 202 if a := aerr.Code(); len(a) == 0 { 203 t.Errorf("expect, error code") 204 } 205 if a := aerr.Message(); len(a) == 0 { 206 t.Errorf("expect, error message") 207 } 208} 209 210func TestInteg_SelectObjectContent_Stream(t *testing.T) { 211 keyName := "selectGopher.csv" 212 213 buf := `name,number 214gopher,0 215ᵷodɥǝɹ,1 216` 217 // Put a mock CSV file to the S3 bucket so that its contents can be 218 // selected. 219 putTestContent(t, strings.NewReader(buf), keyName) 220 221 // Make the Select Object Content API request using the object uploaded. 222 resp, err := s3Svc.SelectObjectContent(&s3.SelectObjectContentInput{ 223 Bucket: &integMetadata.Buckets.Source.Name, 224 Key: &keyName, 225 Expression: aws.String("SELECT name FROM S3Object WHERE cast(number as int) < 1"), 226 ExpressionType: aws.String(s3.ExpressionTypeSql), 227 InputSerialization: &s3.InputSerialization{ 228 CSV: &s3.CSVInput{ 229 FileHeaderInfo: aws.String(s3.FileHeaderInfoUse), 230 }, 231 }, 232 OutputSerialization: &s3.OutputSerialization{ 233 CSV: &s3.CSVOutput{}, 234 }, 235 }) 236 if err != nil { 237 t.Fatalf("failed making API request, %v\n", err) 238 } 239 defer resp.EventStream.Close() 240 241 results, resultWriter := io.Pipe() 242 go func() { 243 defer resultWriter.Close() 244 for event := range resp.EventStream.Events() { 245 switch e := event.(type) { 246 case *s3.RecordsEvent: 247 resultWriter.Write(e.Payload) 248 case *s3.StatsEvent: 249 t.Logf("Processed %d bytes\n", *e.Details.BytesProcessed) 250 } 251 } 252 }() 253 254 // Printout the results 255 resReader := csv.NewReader(results) 256 for { 257 record, err := resReader.Read() 258 if err == io.EOF { 259 break 260 } 261 t.Log(record) 262 } 263 264 if err := resp.EventStream.Err(); err != nil { 265 t.Fatalf("reading from event stream failed, %v\n", err) 266 } 267} 268