1package fluent 2 3import ( 4 "errors" 5 "fmt" 6 "math" 7 "net" 8 "reflect" 9 "strconv" 10 "sync" 11 "time" 12) 13 14const ( 15 defaultHost = "127.0.0.1" 16 defaultPort = 24224 17 defaultTimeout = 3 * time.Second 18 defaultBufferLimit = 8 * 1024 * 1024 19 defaultRetryWait = 500 20 defaultMaxRetry = 13 21 defaultReconnectWaitIncreRate = 1.5 22) 23 24type Config struct { 25 FluentPort int 26 FluentHost string 27 Timeout time.Duration 28 BufferLimit int 29 RetryWait int 30 MaxRetry int 31 TagPrefix string 32} 33 34type Fluent struct { 35 Config 36 conn net.Conn 37 pending []byte 38 reconnecting bool 39 mu sync.Mutex 40} 41 42// New creates a new Logger. 43func New(config Config) (f *Fluent, err error) { 44 if config.FluentHost == "" { 45 config.FluentHost = defaultHost 46 } 47 if config.FluentPort == 0 { 48 config.FluentPort = defaultPort 49 } 50 if config.Timeout == 0 { 51 config.Timeout = defaultTimeout 52 } 53 if config.BufferLimit == 0 { 54 config.BufferLimit = defaultBufferLimit 55 } 56 if config.RetryWait == 0 { 57 config.RetryWait = defaultRetryWait 58 } 59 if config.MaxRetry == 0 { 60 config.MaxRetry = defaultMaxRetry 61 } 62 f = &Fluent{Config: config, reconnecting: false} 63 err = f.connect() 64 return 65} 66 67// Post writes the output for a logging event. 68// 69// Examples: 70// 71// // send string 72// f.Post("tag_name", "data") 73// 74// // send map[string] 75// mapStringData := map[string]string{ 76// "foo": "bar", 77// } 78// f.Post("tag_name", mapStringData) 79// 80// // send message with specified time 81// mapStringData := map[string]string{ 82// "foo": "bar", 83// } 84// tm := time.Now() 85// f.PostWithTime("tag_name", tm, mapStringData) 86// 87// // send struct 88// structData := struct { 89// Name string `msg:"name"` 90// } { 91// "john smith", 92// } 93// f.Post("tag_name", structData) 94// 95func (f *Fluent) Post(tag string, message interface{}) error { 96 timeNow := time.Now() 97 return f.PostWithTime(tag, timeNow, message) 98} 99 100func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error { 101 if len(f.TagPrefix) > 0 { 102 tag = f.TagPrefix + "." + tag 103 } 104 105 msg := reflect.ValueOf(message) 106 msgtype := msg.Type() 107 108 if msgtype.Kind() == reflect.Struct { 109 // message should be tagged by "codec" or "msg" 110 kv := make(map[string]interface{}) 111 fields := msgtype.NumField() 112 for i := 0; i < fields; i++ { 113 field := msgtype.Field(i) 114 name := field.Name 115 if n1 := field.Tag.Get("msg"); n1 != "" { 116 name = n1 117 } else if n2 := field.Tag.Get("codec"); n2 != "" { 118 name = n2 119 } 120 kv[name] = msg.FieldByIndex(field.Index).Interface() 121 } 122 return f.EncodeAndPostData(tag, tm, kv) 123 } 124 125 if msgtype.Kind() != reflect.Map { 126 return errors.New("messge must be a map") 127 } else if msgtype.Key().Kind() != reflect.String { 128 return errors.New("map keys must be strings") 129 } 130 131 kv := make(map[string]interface{}) 132 for _, k := range msg.MapKeys() { 133 kv[k.String()] = msg.MapIndex(k).Interface() 134 } 135 136 return f.EncodeAndPostData(tag, tm, kv) 137} 138 139func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error { 140 if data, dumperr := f.EncodeData(tag, tm, message); dumperr != nil { 141 return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%s' to msgpack:%s", message, dumperr) 142 // fmt.Println("fluent#Post: can't convert to msgpack:", message, dumperr) 143 } else { 144 f.PostRawData(data) 145 return nil 146 } 147} 148 149func (f *Fluent) PostRawData(data []byte) { 150 f.mu.Lock() 151 f.pending = append(f.pending, data...) 152 f.mu.Unlock() 153 if err := f.send(); err != nil { 154 f.close() 155 if len(f.pending) > f.Config.BufferLimit { 156 f.flushBuffer() 157 } 158 } else { 159 f.flushBuffer() 160 } 161} 162 163func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) { 164 timeUnix := tm.Unix() 165 msg := &Message{Tag: tag, Time: timeUnix, Record: message} 166 data, err = msg.MarshalMsg(nil) 167 return 168} 169 170// Close closes the connection. 171func (f *Fluent) Close() (err error) { 172 if len(f.pending) > 0 { 173 _ = f.send() 174 } 175 err = f.close() 176 return 177} 178 179// close closes the connection. 180func (f *Fluent) close() (err error) { 181 if f.conn != nil { 182 f.mu.Lock() 183 defer f.mu.Unlock() 184 } else { 185 return 186 } 187 if f.conn != nil { 188 f.conn.Close() 189 f.conn = nil 190 } 191 return 192} 193 194// connect establishes a new connection using the specified transport. 195func (f *Fluent) connect() (err error) { 196 f.conn, err = net.DialTimeout("tcp", f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout) 197 return 198} 199 200func e(x, y float64) int { 201 return int(math.Pow(x, y)) 202} 203 204func (f *Fluent) reconnect() { 205 go func() { 206 for i := 0; ; i++ { 207 err := f.connect() 208 if err == nil { 209 f.mu.Lock() 210 f.reconnecting = false 211 f.mu.Unlock() 212 break 213 } else { 214 if i == f.Config.MaxRetry { 215 panic("fluent#reconnect: failed to reconnect!") 216 } 217 waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1)) 218 time.Sleep(time.Duration(waitTime) * time.Millisecond) 219 } 220 } 221 }() 222} 223 224func (f *Fluent) flushBuffer() { 225 f.mu.Lock() 226 defer f.mu.Unlock() 227 f.pending = f.pending[0:0] 228} 229 230func (f *Fluent) send() (err error) { 231 if f.conn == nil { 232 if f.reconnecting == false { 233 f.mu.Lock() 234 f.reconnecting = true 235 f.mu.Unlock() 236 f.reconnect() 237 } 238 err = errors.New("fluent#send: can't send logs, client is reconnecting") 239 } else { 240 f.mu.Lock() 241 _, err = f.conn.Write(f.pending) 242 f.mu.Unlock() 243 } 244 return 245} 246