1//go:build integration && perftest
2// +build integration,perftest
3
4package main
5
6import (
7	"context"
8	"flag"
9	"fmt"
10	"io"
11	"io/ioutil"
12	"os"
13	"os/signal"
14	"time"
15
16	"github.com/aws/aws-sdk-go/aws"
17	"github.com/aws/aws-sdk-go/aws/credentials"
18	"github.com/aws/aws-sdk-go/aws/request"
19	"github.com/aws/aws-sdk-go/aws/session"
20	"github.com/aws/aws-sdk-go/service/s3"
21)
22
23var config Config
24
25func init() {
26	config.SetupFlags("", flag.CommandLine)
27}
28
29func main() {
30	if err := flag.CommandLine.Parse(os.Args[1:]); err != nil {
31		flag.CommandLine.PrintDefaults()
32		exitErrorf(err, "failed to parse CLI commands")
33	}
34	if err := config.Validate(); err != nil {
35		flag.CommandLine.PrintDefaults()
36		exitErrorf(err, "invalid arguments")
37	}
38
39	client := NewClient(config.Client)
40
41	var creds *credentials.Credentials
42	if config.SDK.Anonymous {
43		creds = credentials.AnonymousCredentials
44	}
45
46	var endpoint *string
47	if v := config.Endpoint; len(v) != 0 {
48		endpoint = &v
49	}
50
51	sess, err := session.NewSession(&aws.Config{
52		HTTPClient:           client,
53		Endpoint:             endpoint,
54		Credentials:          creds,
55		S3Disable100Continue: aws.Bool(!config.SDK.ExpectContinue),
56	})
57	if err != nil {
58		exitErrorf(err, "failed to load config")
59	}
60
61	// Create context cancel for Ctrl+C/Interrupt
62	ctx, cancelFn := context.WithCancel(context.Background())
63	defer cancelFn()
64	sigCh := make(chan os.Signal, 1)
65	signal.Notify(sigCh, os.Interrupt)
66	go func() {
67		<-sigCh
68		cancelFn()
69	}()
70
71	// Use the request duration timeout if specified.
72	if config.RequestDuration != 0 {
73		var timeoutFn func()
74		ctx, timeoutFn = context.WithTimeout(ctx, config.RequestDuration)
75		defer timeoutFn()
76	}
77
78	logger := NewLogger(os.Stdout)
79
80	// Start making the requests.
81	svc := s3.New(sess)
82	var reqCount int64
83	errCount := 0
84	for {
85		trace := doRequest(ctx, reqCount, svc, config)
86		select {
87		case <-ctx.Done():
88			return
89		default:
90		}
91		logger.RecordTrace(trace)
92
93		if err := trace.Err(); err != nil {
94			fmt.Fprintf(os.Stderr, err.Error())
95			errCount++
96		} else {
97			errCount = 0
98		}
99
100		if config.RequestCount > 0 && reqCount == config.RequestCount {
101			return
102		}
103
104		reqCount++
105
106		// If the first several requests fail, exist, something is broken.
107		if errCount == 5 && reqCount == 5 {
108			exitErrorf(trace.Err(), "unable to make requests")
109		}
110
111		if config.RequestDelay > 0 {
112			time.Sleep(config.RequestDelay)
113		}
114	}
115}
116
117func doRequest(ctx context.Context, id int64, svc *s3.S3, config Config) *RequestTrace {
118	traceCtx := NewRequestTrace(ctx, id)
119	defer traceCtx.RequestDone()
120
121	resp, err := svc.GetObjectWithContext(traceCtx, &s3.GetObjectInput{
122		Bucket: &config.Bucket,
123		Key:    &config.Key,
124	}, func(r *request.Request) {
125		r.Handlers.Send.PushFront(traceCtx.OnSendAttempt)
126		r.Handlers.Complete.PushBack(traceCtx.OnCompleteRequest)
127		r.Handlers.CompleteAttempt.PushBack(traceCtx.OnCompleteAttempt)
128	})
129	if err != nil {
130		traceCtx.AppendError(fmt.Errorf("request failed, %v", err))
131		return traceCtx
132	}
133	defer resp.Body.Close()
134
135	if n, err := io.Copy(ioutil.Discard, resp.Body); err != nil {
136		traceCtx.AppendError(fmt.Errorf("read request body failed, read %v, %v", n, err))
137		return traceCtx
138	}
139
140	return traceCtx
141}
142
143func exitErrorf(err error, msg string, args ...interface{}) {
144	fmt.Fprintf(os.Stderr, "FAILED: %v\n"+msg+"\n", append([]interface{}{err}, args...)...)
145	os.Exit(1)
146}
147