1package fluent 2 3import ( 4 "encoding/json" 5 "io/ioutil" 6 "net" 7 "reflect" 8 "runtime" 9 "testing" 10 "time" 11 12 "github.com/bmizerany/assert" 13) 14 15const ( 16 RECV_BUF_LEN = 1024 17) 18 19// Conn is net.Conn with the parameters to be verified in the test 20type Conn struct { 21 net.Conn 22 buf []byte 23 writeDeadline time.Time 24} 25 26func (c *Conn) Read(b []byte) (int, error) { 27 copy(b, c.buf) 28 return len(c.buf), nil 29} 30 31func (c *Conn) Write(b []byte) (int, error) { 32 c.buf = make([]byte, len(b)) 33 copy(c.buf, b) 34 return len(b), nil 35} 36 37func (c *Conn) SetWriteDeadline(t time.Time) error { 38 c.writeDeadline = t 39 return nil 40} 41 42func (c *Conn) Close() error { 43 return nil 44} 45 46func init() { 47 numProcs := runtime.NumCPU() 48 if numProcs < 2 { 49 numProcs = 2 50 } 51 runtime.GOMAXPROCS(numProcs) 52 53 listener, err := net.Listen("tcp", "0.0.0.0:6666") 54 if err != nil { 55 println("error listening:", err.Error()) 56 } 57 go func() { 58 for { 59 conn, err := listener.Accept() 60 if err != nil { 61 println("Error accept:", err.Error()) 62 return 63 } 64 go EchoFunc(conn) 65 } 66 }() 67} 68 69func EchoFunc(conn net.Conn) { 70 for { 71 buf := make([]byte, RECV_BUF_LEN) 72 n, err := conn.Read(buf) 73 if err != nil { 74 println("Error reading:", err.Error()) 75 return 76 } 77 println("received ", n, " bytes of data =", string(buf)) 78 } 79} 80 81func Test_New_itShouldUseDefaultConfigValuesIfNoOtherProvided(t *testing.T) { 82 f, _ := New(Config{}) 83 assert.Equal(t, f.Config.FluentPort, defaultPort) 84 assert.Equal(t, f.Config.FluentHost, defaultHost) 85 assert.Equal(t, f.Config.Timeout, defaultTimeout) 86 assert.Equal(t, f.Config.WriteTimeout, defaultWriteTimeout) 87 assert.Equal(t, f.Config.BufferLimit, defaultBufferLimit) 88 assert.Equal(t, f.Config.FluentNetwork, defaultNetwork) 89 assert.Equal(t, f.Config.FluentSocketPath, defaultSocketPath) 90} 91 92func Test_New_itShouldUseUnixDomainSocketIfUnixSocketSpecified(t *testing.T) { 93 if runtime.GOOS == "windows" { 94 t.Skip("windows not supported") 95 } 96 socketFile := "/tmp/fluent-logger-golang.sock" 97 network := "unix" 98 l, err := net.Listen(network, socketFile) 99 if err != nil { 100 t.Error(err) 101 return 102 } 103 defer l.Close() 104 105 f, err := New(Config{ 106 FluentNetwork: network, 107 FluentSocketPath: socketFile}) 108 if err != nil { 109 t.Error(err) 110 return 111 } 112 defer f.Close() 113 assert.Equal(t, f.Config.FluentNetwork, network) 114 assert.Equal(t, f.Config.FluentSocketPath, socketFile) 115 116 socketFile = "/tmp/fluent-logger-golang-xxx.sock" 117 network = "unixxxx" 118 fUnknown, err := New(Config{ 119 FluentNetwork: network, 120 FluentSocketPath: socketFile}) 121 if _, ok := err.(net.UnknownNetworkError); !ok { 122 t.Errorf("err type: %T", err) 123 } 124 if err == nil { 125 t.Error(err) 126 fUnknown.Close() 127 return 128 } 129} 130 131func Test_New_itShouldUseConfigValuesFromArguments(t *testing.T) { 132 f, _ := New(Config{FluentPort: 6666, FluentHost: "foobarhost"}) 133 assert.Equal(t, f.Config.FluentPort, 6666) 134 assert.Equal(t, f.Config.FluentHost, "foobarhost") 135} 136 137func Test_New_itShouldUseConfigValuesFromMashalAsJSONArgument(t *testing.T) { 138 f, _ := New(Config{MarshalAsJSON: true}) 139 assert.Equal(t, f.Config.MarshalAsJSON, true) 140} 141 142func Test_send_WritePendingToConn(t *testing.T) { 143 f := &Fluent{Config: Config{}, reconnecting: false} 144 145 conn := &Conn{} 146 f.conn = conn 147 148 msg := "This is test writing." 149 bmsg := []byte(msg) 150 f.pending = append(f.pending, bmsg...) 151 152 err := f.send() 153 if err != nil { 154 t.Error(err) 155 } 156 157 rcv := make([]byte, len(conn.buf)) 158 _, err = conn.Read(rcv) 159 if string(rcv) != msg { 160 t.Errorf("got %s, except %s", string(rcv), msg) 161 } 162} 163 164func Test_MarshalAsMsgpack(t *testing.T) { 165 f := &Fluent{Config: Config{}, reconnecting: false} 166 167 conn := &Conn{} 168 f.conn = conn 169 170 tag := "tag" 171 var data = map[string]string{ 172 "foo": "bar", 173 "hoge": "hoge"} 174 tm := time.Unix(1267867237, 0) 175 result, err := f.EncodeData(tag, tm, data) 176 177 if err != nil { 178 t.Error(err) 179 } 180 actual := string(result) 181 182 // map entries are disordered in golang 183 expected1 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA3foo\xA3bar\xA4hoge\xA4hoge\xC0" 184 expected2 := "\x94\xA3tag\xD2K\x92\u001Ee\x82\xA4hoge\xA4hoge\xA3foo\xA3bar\xC0" 185 if actual != expected1 && actual != expected2 { 186 t.Errorf("got %x,\n except %x\n or %x", actual, expected1, expected2) 187 } 188} 189 190func Test_SubSecondPrecision(t *testing.T) { 191 // Setup the test subject 192 fluent := &Fluent{ 193 Config: Config{ 194 SubSecondPrecision: true, 195 }, 196 reconnecting: false, 197 } 198 fluent.conn = &Conn{} 199 200 // Exercise the test subject 201 timestamp := time.Unix(1267867237, 256) 202 encodedData, err := fluent.EncodeData("tag", timestamp, map[string]string{ 203 "foo": "bar", 204 }) 205 206 // Assert no encoding errors and that the timestamp has been encoded into 207 // the message as expected. 208 if err != nil { 209 t.Error(err) 210 } 211 212 expected := "\x94\xA3tag\xC7\x08\x00K\x92\u001Ee\x00\x00\x01\x00\x81\xA3foo\xA3bar\xC0" 213 actual := string(encodedData) 214 assert.Equal(t, expected, actual) 215} 216 217func Test_MarshalAsJSON(t *testing.T) { 218 f := &Fluent{Config: Config{MarshalAsJSON: true}, reconnecting: false} 219 220 conn := &Conn{} 221 f.conn = conn 222 223 var data = map[string]string{ 224 "foo": "bar", 225 "hoge": "hoge"} 226 tm := time.Unix(1267867237, 0) 227 result, err := f.EncodeData("tag", tm, data) 228 229 if err != nil { 230 t.Error(err) 231 } 232 // json.Encode marshals map keys in the order, so this expectation is safe 233 expected := `["tag",1267867237,{"foo":"bar","hoge":"hoge"},null]` 234 actual := string(result) 235 if actual != expected { 236 t.Errorf("got %s, except %s", actual, expected) 237 } 238} 239 240func TestJsonConfig(t *testing.T) { 241 b, err := ioutil.ReadFile(`testdata/config.json`) 242 if err != nil { 243 t.Error(err) 244 } 245 var got Config 246 expect := Config{ 247 FluentPort: 8888, 248 FluentHost: "localhost", 249 FluentNetwork: "tcp", 250 FluentSocketPath: "/var/tmp/fluent.sock", 251 Timeout: 3000, 252 WriteTimeout: 6000, 253 BufferLimit: 200, 254 RetryWait: 5, 255 MaxRetry: 3, 256 TagPrefix: "fluent", 257 AsyncConnect: false, 258 MarshalAsJSON: true, 259 } 260 261 err = json.Unmarshal(b, &got) 262 if err != nil { 263 t.Error(err) 264 } 265 266 if !reflect.DeepEqual(expect, got) { 267 t.Errorf("got %v, except %v", got, expect) 268 } 269} 270 271func TestAsyncConnect(t *testing.T) { 272 type result struct { 273 f *Fluent 274 err error 275 } 276 ch := make(chan result, 1) 277 go func() { 278 config := Config{ 279 FluentPort: 8888, 280 AsyncConnect: true, 281 } 282 f, err := New(config) 283 ch <- result{f: f, err: err} 284 }() 285 286 select { 287 case res := <-ch: 288 if res.err != nil { 289 t.Errorf("fluent.New() failed with %#v", res.err) 290 return 291 } 292 res.f.Close() 293 case <-time.After(time.Millisecond * 500): 294 t.Error("AsyncConnect must not block") 295 } 296} 297 298func Test_PostWithTimeNotTimeOut(t *testing.T) { 299 f, err := New(Config{ 300 FluentPort: 6666, 301 AsyncConnect: false, 302 MarshalAsJSON: true, // easy to check equality 303 }) 304 if err != nil { 305 t.Error(err) 306 return 307 } 308 309 var testData = []struct { 310 in map[string]string 311 out string 312 }{ 313 { 314 map[string]string{"foo": "bar"}, 315 "[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]", 316 }, 317 { 318 map[string]string{"fuga": "bar", "hoge": "fuga"}, 319 "[\"tag_name\",1482493046,{\"fuga\":\"bar\",\"hoge\":\"fuga\"},null]", 320 }, 321 } 322 for _, tt := range testData { 323 conn := &Conn{} 324 f.conn = conn 325 326 err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), tt.in) 327 if err != nil { 328 t.Errorf("in=%s, err=%s", tt.in, err) 329 } 330 331 rcv := make([]byte, len(conn.buf)) 332 _, err = conn.Read(rcv) 333 if string(rcv) != tt.out { 334 t.Errorf("got %s, except %s", string(rcv), tt.out) 335 } 336 337 if !conn.writeDeadline.IsZero() { 338 t.Errorf("got %s, except 0", conn.writeDeadline) 339 } 340 } 341} 342 343func Test_PostMsgpMarshaler(t *testing.T) { 344 f, err := New(Config{ 345 FluentPort: 6666, 346 AsyncConnect: false, 347 MarshalAsJSON: true, // easy to check equality 348 }) 349 if err != nil { 350 t.Error(err) 351 return 352 } 353 354 var testData = []struct { 355 in *TestMessage 356 out string 357 }{ 358 { 359 &TestMessage{Foo: "bar"}, 360 "[\"tag_name\",1482493046,{\"foo\":\"bar\"},null]", 361 }, 362 } 363 for _, tt := range testData { 364 conn := &Conn{} 365 f.conn = conn 366 367 err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), tt.in) 368 if err != nil { 369 t.Errorf("in=%s, err=%s", tt.in, err) 370 } 371 372 rcv := make([]byte, len(conn.buf)) 373 _, err = conn.Read(rcv) 374 if string(rcv) != tt.out { 375 t.Errorf("got %s, except %s", string(rcv), tt.out) 376 } 377 378 if !conn.writeDeadline.IsZero() { 379 t.Errorf("got %s, except 0", conn.writeDeadline) 380 } 381 } 382} 383 384func Benchmark_PostWithShortMessage(b *testing.B) { 385 b.StopTimer() 386 f, err := New(Config{}) 387 if err != nil { 388 panic(err) 389 } 390 391 b.StartTimer() 392 data := map[string]string{"message": "Hello World"} 393 for i := 0; i < b.N; i++ { 394 if err := f.Post("tag", data); err != nil { 395 panic(err) 396 } 397 } 398} 399 400func Benchmark_PostWithShortMessageMarshalAsJSON(b *testing.B) { 401 b.StopTimer() 402 f, err := New(Config{MarshalAsJSON: true}) 403 if err != nil { 404 panic(err) 405 } 406 407 b.StartTimer() 408 data := map[string]string{"message": "Hello World"} 409 for i := 0; i < b.N; i++ { 410 if err := f.Post("tag", data); err != nil { 411 panic(err) 412 } 413 } 414} 415 416func Benchmark_LogWithChunks(b *testing.B) { 417 b.StopTimer() 418 f, err := New(Config{}) 419 if err != nil { 420 panic(err) 421 } 422 423 b.StartTimer() 424 data := map[string]string{"msg": "sdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddfsdfsdsdfdsfdsddddf"} 425 for i := 0; i < b.N; i++ { 426 if err := f.Post("tag", data); err != nil { 427 panic(err) 428 } 429 } 430} 431 432func Benchmark_PostWithStruct(b *testing.B) { 433 b.StopTimer() 434 f, err := New(Config{}) 435 if err != nil { 436 panic(err) 437 } 438 439 b.StartTimer() 440 data := struct { 441 Name string `msg:"msgnamename"` 442 }{ 443 "john smith", 444 } 445 for i := 0; i < b.N; i++ { 446 if err := f.Post("tag", data); err != nil { 447 panic(err) 448 } 449 } 450} 451 452func Benchmark_PostWithStructTaggedAsCodec(b *testing.B) { 453 b.StopTimer() 454 f, err := New(Config{}) 455 if err != nil { 456 panic(err) 457 } 458 459 b.StartTimer() 460 data := struct { 461 Name string `codec:"codecname"` 462 }{ 463 "john smith", 464 } 465 for i := 0; i < b.N; i++ { 466 if err := f.Post("tag", data); err != nil { 467 panic(err) 468 } 469 } 470} 471 472func Benchmark_PostWithStructWithoutTag(b *testing.B) { 473 b.StopTimer() 474 f, err := New(Config{}) 475 if err != nil { 476 panic(err) 477 } 478 479 b.StartTimer() 480 data := struct { 481 Name string 482 }{ 483 "john smith", 484 } 485 for i := 0; i < b.N; i++ { 486 if err := f.Post("tag", data); err != nil { 487 panic(err) 488 } 489 } 490} 491 492func Benchmark_PostWithMapString(b *testing.B) { 493 b.StopTimer() 494 f, err := New(Config{}) 495 if err != nil { 496 panic(err) 497 } 498 499 b.StartTimer() 500 data := map[string]string{ 501 "foo": "bar", 502 } 503 for i := 0; i < b.N; i++ { 504 if err := f.Post("tag", data); err != nil { 505 panic(err) 506 } 507 } 508} 509 510func Benchmark_PostWithMsgpMarshaler(b *testing.B) { 511 b.StopTimer() 512 f, err := New(Config{}) 513 if err != nil { 514 panic(err) 515 } 516 517 b.StartTimer() 518 data := &TestMessage{Foo: "bar"} 519 for i := 0; i < b.N; i++ { 520 if err := f.Post("tag", data); err != nil { 521 panic(err) 522 } 523 } 524} 525 526func Benchmark_PostWithMapSlice(b *testing.B) { 527 b.StopTimer() 528 f, err := New(Config{}) 529 if err != nil { 530 panic(err) 531 } 532 533 b.StartTimer() 534 data := map[string][]int{ 535 "foo": {1, 2, 3}, 536 } 537 for i := 0; i < b.N; i++ { 538 if err := f.Post("tag", data); err != nil { 539 panic(err) 540 } 541 } 542} 543 544func Benchmark_PostWithMapStringAndTime(b *testing.B) { 545 b.StopTimer() 546 f, err := New(Config{}) 547 if err != nil { 548 panic(err) 549 } 550 551 b.StartTimer() 552 data := map[string]string{ 553 "foo": "bar", 554 } 555 tm := time.Now() 556 for i := 0; i < b.N; i++ { 557 if err := f.PostWithTime("tag", tm, data); err != nil { 558 panic(err) 559 } 560 } 561} 562