1//go:build integration && perftest 2// +build integration,perftest 3 4package main 5 6import ( 7 "context" 8 "flag" 9 "fmt" 10 "io" 11 "io/ioutil" 12 "os" 13 "os/signal" 14 "time" 15 16 "github.com/aws/aws-sdk-go/aws" 17 "github.com/aws/aws-sdk-go/aws/credentials" 18 "github.com/aws/aws-sdk-go/aws/request" 19 "github.com/aws/aws-sdk-go/aws/session" 20 "github.com/aws/aws-sdk-go/service/s3" 21) 22 23var config Config 24 25func init() { 26 config.SetupFlags("", flag.CommandLine) 27} 28 29func main() { 30 if err := flag.CommandLine.Parse(os.Args[1:]); err != nil { 31 flag.CommandLine.PrintDefaults() 32 exitErrorf(err, "failed to parse CLI commands") 33 } 34 if err := config.Validate(); err != nil { 35 flag.CommandLine.PrintDefaults() 36 exitErrorf(err, "invalid arguments") 37 } 38 39 client := NewClient(config.Client) 40 41 var creds *credentials.Credentials 42 if config.SDK.Anonymous { 43 creds = credentials.AnonymousCredentials 44 } 45 46 var endpoint *string 47 if v := config.Endpoint; len(v) != 0 { 48 endpoint = &v 49 } 50 51 sess, err := session.NewSession(&aws.Config{ 52 HTTPClient: client, 53 Endpoint: endpoint, 54 Credentials: creds, 55 S3Disable100Continue: aws.Bool(!config.SDK.ExpectContinue), 56 }) 57 if err != nil { 58 exitErrorf(err, "failed to load config") 59 } 60 61 // Create context cancel for Ctrl+C/Interrupt 62 ctx, cancelFn := context.WithCancel(context.Background()) 63 defer cancelFn() 64 sigCh := make(chan os.Signal, 1) 65 signal.Notify(sigCh, os.Interrupt) 66 go func() { 67 <-sigCh 68 cancelFn() 69 }() 70 71 // Use the request duration timeout if specified. 72 if config.RequestDuration != 0 { 73 var timeoutFn func() 74 ctx, timeoutFn = context.WithTimeout(ctx, config.RequestDuration) 75 defer timeoutFn() 76 } 77 78 logger := NewLogger(os.Stdout) 79 80 // Start making the requests. 81 svc := s3.New(sess) 82 var reqCount int64 83 errCount := 0 84 for { 85 trace := doRequest(ctx, reqCount, svc, config) 86 select { 87 case <-ctx.Done(): 88 return 89 default: 90 } 91 logger.RecordTrace(trace) 92 93 if err := trace.Err(); err != nil { 94 fmt.Fprintf(os.Stderr, err.Error()) 95 errCount++ 96 } else { 97 errCount = 0 98 } 99 100 if config.RequestCount > 0 && reqCount == config.RequestCount { 101 return 102 } 103 104 reqCount++ 105 106 // If the first several requests fail, exist, something is broken. 107 if errCount == 5 && reqCount == 5 { 108 exitErrorf(trace.Err(), "unable to make requests") 109 } 110 111 if config.RequestDelay > 0 { 112 time.Sleep(config.RequestDelay) 113 } 114 } 115} 116 117func doRequest(ctx context.Context, id int64, svc *s3.S3, config Config) *RequestTrace { 118 traceCtx := NewRequestTrace(ctx, id) 119 defer traceCtx.RequestDone() 120 121 resp, err := svc.GetObjectWithContext(traceCtx, &s3.GetObjectInput{ 122 Bucket: &config.Bucket, 123 Key: &config.Key, 124 }, func(r *request.Request) { 125 r.Handlers.Send.PushFront(traceCtx.OnSendAttempt) 126 r.Handlers.Complete.PushBack(traceCtx.OnCompleteRequest) 127 r.Handlers.CompleteAttempt.PushBack(traceCtx.OnCompleteAttempt) 128 }) 129 if err != nil { 130 traceCtx.AppendError(fmt.Errorf("request failed, %v", err)) 131 return traceCtx 132 } 133 defer resp.Body.Close() 134 135 if n, err := io.Copy(ioutil.Discard, resp.Body); err != nil { 136 traceCtx.AppendError(fmt.Errorf("read request body failed, read %v, %v", n, err)) 137 return traceCtx 138 } 139 140 return traceCtx 141} 142 143func exitErrorf(err error, msg string, args ...interface{}) { 144 fmt.Fprintf(os.Stderr, "FAILED: %v\n"+msg+"\n", append([]interface{}{err}, args...)...) 145 os.Exit(1) 146} 147