1package fluent
2
3import (
4	"errors"
5	"fmt"
6	"math"
7	"net"
8	"reflect"
9	"strconv"
10	"sync"
11	"time"
12)
13
14const (
15	defaultHost                   = "127.0.0.1"
16	defaultPort                   = 24224
17	defaultTimeout                = 3 * time.Second
18	defaultBufferLimit            = 8 * 1024 * 1024
19	defaultRetryWait              = 500
20	defaultMaxRetry               = 13
21	defaultReconnectWaitIncreRate = 1.5
22)
23
24type Config struct {
25	FluentPort  int
26	FluentHost  string
27	Timeout     time.Duration
28	BufferLimit int
29	RetryWait   int
30	MaxRetry    int
31	TagPrefix   string
32}
33
34type Fluent struct {
35	Config
36	conn         net.Conn
37	pending      []byte
38	reconnecting bool
39	mu           sync.Mutex
40}
41
42// New creates a new Logger.
43func New(config Config) (f *Fluent, err error) {
44	if config.FluentHost == "" {
45		config.FluentHost = defaultHost
46	}
47	if config.FluentPort == 0 {
48		config.FluentPort = defaultPort
49	}
50	if config.Timeout == 0 {
51		config.Timeout = defaultTimeout
52	}
53	if config.BufferLimit == 0 {
54		config.BufferLimit = defaultBufferLimit
55	}
56	if config.RetryWait == 0 {
57		config.RetryWait = defaultRetryWait
58	}
59	if config.MaxRetry == 0 {
60		config.MaxRetry = defaultMaxRetry
61	}
62	f = &Fluent{Config: config, reconnecting: false}
63	err = f.connect()
64	return
65}
66
67// Post writes the output for a logging event.
68//
69// Examples:
70//
71//  // send string
72//  f.Post("tag_name", "data")
73//
74//  // send map[string]
75//  mapStringData := map[string]string{
76//  	"foo":  "bar",
77//  }
78//  f.Post("tag_name", mapStringData)
79//
80//  // send message with specified time
81//  mapStringData := map[string]string{
82//  	"foo":  "bar",
83//  }
84//  tm := time.Now()
85//  f.PostWithTime("tag_name", tm, mapStringData)
86//
87//  // send struct
88//  structData := struct {
89//  		Name string `msg:"name"`
90//  } {
91//  		"john smith",
92//  }
93//  f.Post("tag_name", structData)
94//
95func (f *Fluent) Post(tag string, message interface{}) error {
96	timeNow := time.Now()
97	return f.PostWithTime(tag, timeNow, message)
98}
99
100func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error {
101	if len(f.TagPrefix) > 0 {
102		tag = f.TagPrefix + "." + tag
103	}
104
105	msg := reflect.ValueOf(message)
106	msgtype := msg.Type()
107
108	if msgtype.Kind() == reflect.Struct {
109		// message should be tagged by "codec" or "msg"
110		kv := make(map[string]interface{})
111		fields := msgtype.NumField()
112		for i := 0; i < fields; i++ {
113			field := msgtype.Field(i)
114			name := field.Name
115			if n1 := field.Tag.Get("msg"); n1 != "" {
116				name = n1
117			} else if n2 := field.Tag.Get("codec"); n2 != "" {
118				name = n2
119			}
120			kv[name] = msg.FieldByIndex(field.Index).Interface()
121		}
122		return f.EncodeAndPostData(tag, tm, kv)
123	}
124
125	if msgtype.Kind() != reflect.Map {
126		return errors.New("messge must be a map")
127	} else if msgtype.Key().Kind() != reflect.String {
128		return errors.New("map keys must be strings")
129	}
130
131	kv := make(map[string]interface{})
132	for _, k := range msg.MapKeys() {
133		kv[k.String()] = msg.MapIndex(k).Interface()
134	}
135
136	return f.EncodeAndPostData(tag, tm, kv)
137}
138
139func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
140	if data, dumperr := f.EncodeData(tag, tm, message); dumperr != nil {
141		return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%s' to msgpack:%s", message, dumperr)
142		// fmt.Println("fluent#Post: can't convert to msgpack:", message, dumperr)
143	} else {
144		f.PostRawData(data)
145		return nil
146	}
147}
148
149func (f *Fluent) PostRawData(data []byte) {
150	f.mu.Lock()
151	f.pending = append(f.pending, data...)
152	f.mu.Unlock()
153	if err := f.send(); err != nil {
154		f.close()
155		if len(f.pending) > f.Config.BufferLimit {
156			f.flushBuffer()
157		}
158	} else {
159		f.flushBuffer()
160	}
161}
162
163func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
164	timeUnix := tm.Unix()
165	msg := &Message{Tag: tag, Time: timeUnix, Record: message}
166	data, err = msg.MarshalMsg(nil)
167	return
168}
169
170// Close closes the connection.
171func (f *Fluent) Close() (err error) {
172	if len(f.pending) > 0 {
173		_ = f.send()
174	}
175	err = f.close()
176	return
177}
178
179// close closes the connection.
180func (f *Fluent) close() (err error) {
181	if f.conn != nil {
182		f.mu.Lock()
183		defer f.mu.Unlock()
184	} else {
185		return
186	}
187	if f.conn != nil {
188		f.conn.Close()
189		f.conn = nil
190	}
191	return
192}
193
194// connect establishes a new connection using the specified transport.
195func (f *Fluent) connect() (err error) {
196	f.conn, err = net.DialTimeout("tcp", f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
197	return
198}
199
200func e(x, y float64) int {
201	return int(math.Pow(x, y))
202}
203
204func (f *Fluent) reconnect() {
205	go func() {
206		for i := 0; ; i++ {
207			err := f.connect()
208			if err == nil {
209				f.mu.Lock()
210				f.reconnecting = false
211				f.mu.Unlock()
212				break
213			} else {
214				if i == f.Config.MaxRetry {
215					panic("fluent#reconnect: failed to reconnect!")
216				}
217				waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
218				time.Sleep(time.Duration(waitTime) * time.Millisecond)
219			}
220		}
221	}()
222}
223
224func (f *Fluent) flushBuffer() {
225	f.mu.Lock()
226	defer f.mu.Unlock()
227	f.pending = f.pending[0:0]
228}
229
230func (f *Fluent) send() (err error) {
231	if f.conn == nil {
232		if f.reconnecting == false {
233			f.mu.Lock()
234			f.reconnecting = true
235			f.mu.Unlock()
236			f.reconnect()
237		}
238		err = errors.New("fluent#send: can't send logs, client is reconnecting")
239	} else {
240		f.mu.Lock()
241		_, err = f.conn.Write(f.pending)
242		f.mu.Unlock()
243	}
244	return
245}
246