1package main
2
3import (
4	_ "bufio"
5	"fmt"
6	_ "io"
7	"log"
8	"math/rand"
9	"os"
10	"path/filepath"
11	"strings"
12
13	"github.com/aws/aws-sdk-go/aws"
14	"github.com/aws/aws-sdk-go/aws/session"
15	"github.com/aws/aws-sdk-go/service/s3"
16	_ "github.com/aws/aws-sdk-go/service/s3/s3manager"
17)
18
19// S3Output output plugin
20type S3Output struct {
21	pathTemplate string
22
23	buffer  *FileOutput
24	session *session.Session
25	config  *FileOutputConfig
26	closeCh chan struct{}
27}
28
29// NewS3Output constructor for FileOutput, accepts path
30func NewS3Output(pathTemplate string, config *FileOutputConfig) *S3Output {
31	if !PRO {
32		log.Fatal("Using S3 output and input requires PRO license")
33		return nil
34	}
35
36	o := new(S3Output)
37	o.pathTemplate = pathTemplate
38	o.config = config
39	o.config.onClose = o.onBufferUpdate
40
41	if config.BufferPath == "" {
42		config.BufferPath = "/tmp"
43	}
44
45	rnd := rand.Int63()
46	bufferName := fmt.Sprintf("gor_output_s3_%d_buf_", rnd)
47
48	pathParts := strings.Split(pathTemplate, "/")
49	bufferName += pathParts[len(pathParts)-1]
50
51	if strings.HasSuffix(o.pathTemplate, ".gz") {
52		bufferName += ".gz"
53	}
54
55	bufferPath := filepath.Join(config.BufferPath, bufferName)
56
57	o.buffer = NewFileOutput(bufferPath, config)
58	o.connect()
59
60	return o
61}
62
63func (o *S3Output) connect() {
64	if o.session == nil {
65		o.session = session.Must(session.NewSession(awsConfig()))
66		log.Println("[S3 Output] S3 connection succesfully initialized")
67	}
68}
69
70func (o *S3Output) Write(data []byte) (n int, err error) {
71	return o.buffer.Write(data)
72}
73
74func (o *S3Output) String() string {
75	return "S3 output: " + o.pathTemplate
76}
77
78// Close close the buffer of the S3 connection
79func (o *S3Output) Close() error {
80	return o.buffer.Close()
81}
82
83func parseS3Url(path string) (bucket, key string) {
84	path = path[5:] // stripping `s3://`
85	sep := strings.IndexByte(path, '/')
86
87	bucket = path[:sep]
88	key = path[sep+1:]
89
90	return bucket, key
91}
92
93func (o *S3Output) keyPath(idx int) (bucket, key string) {
94	bucket, key = parseS3Url(o.pathTemplate)
95
96	for name, fn := range dateFileNameFuncs {
97		key = strings.Replace(key, name, fn(o.buffer), -1)
98	}
99
100	key = setFileIndex(key, idx)
101
102	return
103}
104
105func (o *S3Output) onBufferUpdate(path string) {
106	svc := s3.New(o.session)
107	idx := getFileIndex(path)
108	bucket, key := o.keyPath(idx)
109
110	file, _ := os.Open(path)
111	// reader := bufio.NewReader(file)
112
113	_, err := svc.PutObject(&s3.PutObjectInput{
114		Body:   file,
115		Bucket: aws.String(bucket),
116		Key:    aws.String(key),
117	})
118	if err != nil {
119		log.Printf("[S3 Output] Failed to upload data to %s/%s, %s\n", bucket, key, err)
120		os.Remove(path)
121		return
122	}
123
124	os.Remove(path)
125
126	if o.closeCh != nil {
127		o.closeCh <- struct{}{}
128	}
129}
130