1package main 2 3import ( 4 _ "bufio" 5 "fmt" 6 _ "io" 7 "log" 8 "math/rand" 9 "os" 10 "path/filepath" 11 "strings" 12 13 "github.com/aws/aws-sdk-go/aws" 14 "github.com/aws/aws-sdk-go/aws/session" 15 "github.com/aws/aws-sdk-go/service/s3" 16 _ "github.com/aws/aws-sdk-go/service/s3/s3manager" 17) 18 19// S3Output output plugin 20type S3Output struct { 21 pathTemplate string 22 23 buffer *FileOutput 24 session *session.Session 25 config *FileOutputConfig 26 closeCh chan struct{} 27} 28 29// NewS3Output constructor for FileOutput, accepts path 30func NewS3Output(pathTemplate string, config *FileOutputConfig) *S3Output { 31 if !PRO { 32 log.Fatal("Using S3 output and input requires PRO license") 33 return nil 34 } 35 36 o := new(S3Output) 37 o.pathTemplate = pathTemplate 38 o.config = config 39 o.config.onClose = o.onBufferUpdate 40 41 if config.BufferPath == "" { 42 config.BufferPath = "/tmp" 43 } 44 45 rnd := rand.Int63() 46 bufferName := fmt.Sprintf("gor_output_s3_%d_buf_", rnd) 47 48 pathParts := strings.Split(pathTemplate, "/") 49 bufferName += pathParts[len(pathParts)-1] 50 51 if strings.HasSuffix(o.pathTemplate, ".gz") { 52 bufferName += ".gz" 53 } 54 55 bufferPath := filepath.Join(config.BufferPath, bufferName) 56 57 o.buffer = NewFileOutput(bufferPath, config) 58 o.connect() 59 60 return o 61} 62 63func (o *S3Output) connect() { 64 if o.session == nil { 65 o.session = session.Must(session.NewSession(awsConfig())) 66 log.Println("[S3 Output] S3 connection succesfully initialized") 67 } 68} 69 70func (o *S3Output) Write(data []byte) (n int, err error) { 71 return o.buffer.Write(data) 72} 73 74func (o *S3Output) String() string { 75 return "S3 output: " + o.pathTemplate 76} 77 78// Close close the buffer of the S3 connection 79func (o *S3Output) Close() error { 80 return o.buffer.Close() 81} 82 83func parseS3Url(path string) (bucket, key string) { 84 path = path[5:] // stripping `s3://` 85 sep := strings.IndexByte(path, '/') 86 87 bucket = path[:sep] 88 key = path[sep+1:] 89 90 return bucket, key 91} 92 93func (o *S3Output) keyPath(idx int) (bucket, key string) { 94 bucket, key = parseS3Url(o.pathTemplate) 95 96 for name, fn := range dateFileNameFuncs { 97 key = strings.Replace(key, name, fn(o.buffer), -1) 98 } 99 100 key = setFileIndex(key, idx) 101 102 return 103} 104 105func (o *S3Output) onBufferUpdate(path string) { 106 svc := s3.New(o.session) 107 idx := getFileIndex(path) 108 bucket, key := o.keyPath(idx) 109 110 file, _ := os.Open(path) 111 // reader := bufio.NewReader(file) 112 113 _, err := svc.PutObject(&s3.PutObjectInput{ 114 Body: file, 115 Bucket: aws.String(bucket), 116 Key: aws.String(key), 117 }) 118 if err != nil { 119 log.Printf("[S3 Output] Failed to upload data to %s/%s, %s\n", bucket, key, err) 120 os.Remove(path) 121 return 122 } 123 124 os.Remove(path) 125 126 if o.closeCh != nil { 127 o.closeCh <- struct{}{} 128 } 129} 130