1package fasthttp 2 3import ( 4 "bufio" 5 "bytes" 6 "crypto/tls" 7 "errors" 8 "fmt" 9 "io" 10 "net" 11 "strings" 12 "sync" 13 "sync/atomic" 14 "time" 15) 16 17// Do performs the given http request and fills the given http response. 18// 19// Request must contain at least non-zero RequestURI with full url (including 20// scheme and host) or non-zero Host header + RequestURI. 21// 22// Client determines the server to be requested in the following order: 23// 24// - from RequestURI if it contains full url with scheme and host; 25// - from Host header otherwise. 26// 27// The function doesn't follow redirects. Use Get* for following redirects. 28// 29// Response is ignored if resp is nil. 30// 31// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections 32// to the requested host are busy. 33// 34// It is recommended obtaining req and resp via AcquireRequest 35// and AcquireResponse in performance-critical code. 36func Do(req *Request, resp *Response) error { 37 return defaultClient.Do(req, resp) 38} 39 40// DoTimeout performs the given request and waits for response during 41// the given timeout duration. 42// 43// Request must contain at least non-zero RequestURI with full url (including 44// scheme and host) or non-zero Host header + RequestURI. 45// 46// Client determines the server to be requested in the following order: 47// 48// - from RequestURI if it contains full url with scheme and host; 49// - from Host header otherwise. 50// 51// The function doesn't follow redirects. Use Get* for following redirects. 52// 53// Response is ignored if resp is nil. 54// 55// ErrTimeout is returned if the response wasn't returned during 56// the given timeout. 57// 58// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections 59// to the requested host are busy. 60// 61// It is recommended obtaining req and resp via AcquireRequest 62// and AcquireResponse in performance-critical code. 63// 64// Warning: DoTimeout does not terminate the request itself. The request will 65// continue in the background and the response will be discarded. 66// If requests take too long and the connection pool gets filled up please 67// try using a Client and setting a ReadTimeout. 68func DoTimeout(req *Request, resp *Response, timeout time.Duration) error { 69 return defaultClient.DoTimeout(req, resp, timeout) 70} 71 72// DoDeadline performs the given request and waits for response until 73// the given deadline. 74// 75// Request must contain at least non-zero RequestURI with full url (including 76// scheme and host) or non-zero Host header + RequestURI. 77// 78// Client determines the server to be requested in the following order: 79// 80// - from RequestURI if it contains full url with scheme and host; 81// - from Host header otherwise. 82// 83// The function doesn't follow redirects. Use Get* for following redirects. 84// 85// Response is ignored if resp is nil. 86// 87// ErrTimeout is returned if the response wasn't returned until 88// the given deadline. 89// 90// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections 91// to the requested host are busy. 92// 93// It is recommended obtaining req and resp via AcquireRequest 94// and AcquireResponse in performance-critical code. 95func DoDeadline(req *Request, resp *Response, deadline time.Time) error { 96 return defaultClient.DoDeadline(req, resp, deadline) 97} 98 99// Get returns the status code and body of url. 100// 101// The contents of dst will be replaced by the body and returned, if the dst 102// is too small a new slice will be allocated. 103// 104// The function follows redirects. Use Do* for manually handling redirects. 105func Get(dst []byte, url string) (statusCode int, body []byte, err error) { 106 return defaultClient.Get(dst, url) 107} 108 109// GetTimeout returns the status code and body of url. 110// 111// The contents of dst will be replaced by the body and returned, if the dst 112// is too small a new slice will be allocated. 113// 114// The function follows redirects. Use Do* for manually handling redirects. 115// 116// ErrTimeout error is returned if url contents couldn't be fetched 117// during the given timeout. 118func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) { 119 return defaultClient.GetTimeout(dst, url, timeout) 120} 121 122// GetDeadline returns the status code and body of url. 123// 124// The contents of dst will be replaced by the body and returned, if the dst 125// is too small a new slice will be allocated. 126// 127// The function follows redirects. Use Do* for manually handling redirects. 128// 129// ErrTimeout error is returned if url contents couldn't be fetched 130// until the given deadline. 131func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) { 132 return defaultClient.GetDeadline(dst, url, deadline) 133} 134 135// Post sends POST request to the given url with the given POST arguments. 136// 137// The contents of dst will be replaced by the body and returned, if the dst 138// is too small a new slice will be allocated. 139// 140// The function follows redirects. Use Do* for manually handling redirects. 141// 142// Empty POST body is sent if postArgs is nil. 143func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) { 144 return defaultClient.Post(dst, url, postArgs) 145} 146 147var defaultClient Client 148 149// Client implements http client. 150// 151// Copying Client by value is prohibited. Create new instance instead. 152// 153// It is safe calling Client methods from concurrently running goroutines. 154type Client struct { 155 noCopy noCopy 156 157 // Client name. Used in User-Agent request header. 158 // 159 // Default client name is used if not set. 160 Name string 161 162 // NoDefaultUserAgentHeader when set to true, causes the default 163 // User-Agent header to be excluded from the Request. 164 NoDefaultUserAgentHeader bool 165 166 // Callback for establishing new connections to hosts. 167 // 168 // Default Dial is used if not set. 169 Dial DialFunc 170 171 // Attempt to connect to both ipv4 and ipv6 addresses if set to true. 172 // 173 // This option is used only if default TCP dialer is used, 174 // i.e. if Dial is blank. 175 // 176 // By default client connects only to ipv4 addresses, 177 // since unfortunately ipv6 remains broken in many networks worldwide :) 178 DialDualStack bool 179 180 // TLS config for https connections. 181 // 182 // Default TLS config is used if not set. 183 TLSConfig *tls.Config 184 185 // Maximum number of connections per each host which may be established. 186 // 187 // DefaultMaxConnsPerHost is used if not set. 188 MaxConnsPerHost int 189 190 // Idle keep-alive connections are closed after this duration. 191 // 192 // By default idle connections are closed 193 // after DefaultMaxIdleConnDuration. 194 MaxIdleConnDuration time.Duration 195 196 // Maximum number of attempts for idempotent calls 197 // 198 // DefaultMaxIdemponentCallAttempts is used if not set. 199 MaxIdemponentCallAttempts int 200 201 // Per-connection buffer size for responses' reading. 202 // This also limits the maximum header size. 203 // 204 // Default buffer size is used if 0. 205 ReadBufferSize int 206 207 // Per-connection buffer size for requests' writing. 208 // 209 // Default buffer size is used if 0. 210 WriteBufferSize int 211 212 // Maximum duration for full response reading (including body). 213 // 214 // By default response read timeout is unlimited. 215 ReadTimeout time.Duration 216 217 // Maximum duration for full request writing (including body). 218 // 219 // By default request write timeout is unlimited. 220 WriteTimeout time.Duration 221 222 // Maximum response body size. 223 // 224 // The client returns ErrBodyTooLarge if this limit is greater than 0 225 // and response body is greater than the limit. 226 // 227 // By default response body size is unlimited. 228 MaxResponseBodySize int 229 230 // Header names are passed as-is without normalization 231 // if this option is set. 232 // 233 // Disabled header names' normalization may be useful only for proxying 234 // responses to other clients expecting case-sensitive 235 // header names. See https://github.com/valyala/fasthttp/issues/57 236 // for details. 237 // 238 // By default request and response header names are normalized, i.e. 239 // The first letter and the first letters following dashes 240 // are uppercased, while all the other letters are lowercased. 241 // Examples: 242 // 243 // * HOST -> Host 244 // * content-type -> Content-Type 245 // * cONTENT-lenGTH -> Content-Length 246 DisableHeaderNamesNormalizing bool 247 248 mLock sync.Mutex 249 m map[string]*HostClient 250 ms map[string]*HostClient 251} 252 253// Get returns the status code and body of url. 254// 255// The contents of dst will be replaced by the body and returned, if the dst 256// is too small a new slice will be allocated. 257// 258// The function follows redirects. Use Do* for manually handling redirects. 259func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) { 260 return clientGetURL(dst, url, c) 261} 262 263// GetTimeout returns the status code and body of url. 264// 265// The contents of dst will be replaced by the body and returned, if the dst 266// is too small a new slice will be allocated. 267// 268// The function follows redirects. Use Do* for manually handling redirects. 269// 270// ErrTimeout error is returned if url contents couldn't be fetched 271// during the given timeout. 272func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) { 273 return clientGetURLTimeout(dst, url, timeout, c) 274} 275 276// GetDeadline returns the status code and body of url. 277// 278// The contents of dst will be replaced by the body and returned, if the dst 279// is too small a new slice will be allocated. 280// 281// The function follows redirects. Use Do* for manually handling redirects. 282// 283// ErrTimeout error is returned if url contents couldn't be fetched 284// until the given deadline. 285func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) { 286 return clientGetURLDeadline(dst, url, deadline, c) 287} 288 289// Post sends POST request to the given url with the given POST arguments. 290// 291// The contents of dst will be replaced by the body and returned, if the dst 292// is too small a new slice will be allocated. 293// 294// The function follows redirects. Use Do* for manually handling redirects. 295// 296// Empty POST body is sent if postArgs is nil. 297func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) { 298 return clientPostURL(dst, url, postArgs, c) 299} 300 301// DoTimeout performs the given request and waits for response during 302// the given timeout duration. 303// 304// Request must contain at least non-zero RequestURI with full url (including 305// scheme and host) or non-zero Host header + RequestURI. 306// 307// Client determines the server to be requested in the following order: 308// 309// - from RequestURI if it contains full url with scheme and host; 310// - from Host header otherwise. 311// 312// The function doesn't follow redirects. Use Get* for following redirects. 313// 314// Response is ignored if resp is nil. 315// 316// ErrTimeout is returned if the response wasn't returned during 317// the given timeout. 318// 319// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections 320// to the requested host are busy. 321// 322// It is recommended obtaining req and resp via AcquireRequest 323// and AcquireResponse in performance-critical code. 324// 325// Warning: DoTimeout does not terminate the request itself. The request will 326// continue in the background and the response will be discarded. 327// If requests take too long and the connection pool gets filled up please 328// try setting a ReadTimeout. 329func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error { 330 return clientDoTimeout(req, resp, timeout, c) 331} 332 333// DoDeadline performs the given request and waits for response until 334// the given deadline. 335// 336// Request must contain at least non-zero RequestURI with full url (including 337// scheme and host) or non-zero Host header + RequestURI. 338// 339// Client determines the server to be requested in the following order: 340// 341// - from RequestURI if it contains full url with scheme and host; 342// - from Host header otherwise. 343// 344// The function doesn't follow redirects. Use Get* for following redirects. 345// 346// Response is ignored if resp is nil. 347// 348// ErrTimeout is returned if the response wasn't returned until 349// the given deadline. 350// 351// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections 352// to the requested host are busy. 353// 354// It is recommended obtaining req and resp via AcquireRequest 355// and AcquireResponse in performance-critical code. 356func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error { 357 return clientDoDeadline(req, resp, deadline, c) 358} 359 360// Do performs the given http request and fills the given http response. 361// 362// Request must contain at least non-zero RequestURI with full url (including 363// scheme and host) or non-zero Host header + RequestURI. 364// 365// Client determines the server to be requested in the following order: 366// 367// - from RequestURI if it contains full url with scheme and host; 368// - from Host header otherwise. 369// 370// Response is ignored if resp is nil. 371// 372// The function doesn't follow redirects. Use Get* for following redirects. 373// 374// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections 375// to the requested host are busy. 376// 377// It is recommended obtaining req and resp via AcquireRequest 378// and AcquireResponse in performance-critical code. 379func (c *Client) Do(req *Request, resp *Response) error { 380 uri := req.URI() 381 host := uri.Host() 382 383 isTLS := false 384 scheme := uri.Scheme() 385 if bytes.Equal(scheme, strHTTPS) { 386 isTLS = true 387 } else if !bytes.Equal(scheme, strHTTP) { 388 return fmt.Errorf("unsupported protocol %q. http and https are supported", scheme) 389 } 390 391 startCleaner := false 392 393 c.mLock.Lock() 394 m := c.m 395 if isTLS { 396 m = c.ms 397 } 398 if m == nil { 399 m = make(map[string]*HostClient) 400 if isTLS { 401 c.ms = m 402 } else { 403 c.m = m 404 } 405 } 406 hc := m[string(host)] 407 if hc == nil { 408 hc = &HostClient{ 409 Addr: addMissingPort(string(host), isTLS), 410 Name: c.Name, 411 NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader, 412 Dial: c.Dial, 413 DialDualStack: c.DialDualStack, 414 IsTLS: isTLS, 415 TLSConfig: c.TLSConfig, 416 MaxConns: c.MaxConnsPerHost, 417 MaxIdleConnDuration: c.MaxIdleConnDuration, 418 MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts, 419 ReadBufferSize: c.ReadBufferSize, 420 WriteBufferSize: c.WriteBufferSize, 421 ReadTimeout: c.ReadTimeout, 422 WriteTimeout: c.WriteTimeout, 423 MaxResponseBodySize: c.MaxResponseBodySize, 424 DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing, 425 } 426 m[string(host)] = hc 427 if len(m) == 1 { 428 startCleaner = true 429 } 430 } 431 c.mLock.Unlock() 432 433 if startCleaner { 434 go c.mCleaner(m) 435 } 436 437 return hc.Do(req, resp) 438} 439 440func (c *Client) mCleaner(m map[string]*HostClient) { 441 mustStop := false 442 443 for { 444 c.mLock.Lock() 445 for k, v := range m { 446 v.connsLock.Lock() 447 shouldRemove := v.connsCount == 0 448 v.connsLock.Unlock() 449 450 if shouldRemove { 451 delete(m, k) 452 } 453 } 454 if len(m) == 0 { 455 mustStop = true 456 } 457 c.mLock.Unlock() 458 459 if mustStop { 460 break 461 } 462 time.Sleep(10 * time.Second) 463 } 464} 465 466// DefaultMaxConnsPerHost is the maximum number of concurrent connections 467// http client may establish per host by default (i.e. if 468// Client.MaxConnsPerHost isn't set). 469const DefaultMaxConnsPerHost = 512 470 471// DefaultMaxIdleConnDuration is the default duration before idle keep-alive 472// connection is closed. 473const DefaultMaxIdleConnDuration = 10 * time.Second 474 475// DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count. 476const DefaultMaxIdemponentCallAttempts = 5 477 478// DialFunc must establish connection to addr. 479// 480// There is no need in establishing TLS (SSL) connection for https. 481// The client automatically converts connection to TLS 482// if HostClient.IsTLS is set. 483// 484// TCP address passed to DialFunc always contains host and port. 485// Example TCP addr values: 486// 487// - foobar.com:80 488// - foobar.com:443 489// - foobar.com:8080 490type DialFunc func(addr string) (net.Conn, error) 491 492// HostClient balances http requests among hosts listed in Addr. 493// 494// HostClient may be used for balancing load among multiple upstream hosts. 495// While multiple addresses passed to HostClient.Addr may be used for balancing 496// load among them, it would be better using LBClient instead, since HostClient 497// may unevenly balance load among upstream hosts. 498// 499// It is forbidden copying HostClient instances. Create new instances instead. 500// 501// It is safe calling HostClient methods from concurrently running goroutines. 502type HostClient struct { 503 noCopy noCopy 504 505 // Comma-separated list of upstream HTTP server host addresses, 506 // which are passed to Dial in a round-robin manner. 507 // 508 // Each address may contain port if default dialer is used. 509 // For example, 510 // 511 // - foobar.com:80 512 // - foobar.com:443 513 // - foobar.com:8080 514 Addr string 515 516 // Client name. Used in User-Agent request header. 517 Name string 518 519 // NoDefaultUserAgentHeader when set to true, causes the default 520 // User-Agent header to be excluded from the Request. 521 NoDefaultUserAgentHeader bool 522 523 // Callback for establishing new connection to the host. 524 // 525 // Default Dial is used if not set. 526 Dial DialFunc 527 528 // Attempt to connect to both ipv4 and ipv6 host addresses 529 // if set to true. 530 // 531 // This option is used only if default TCP dialer is used, 532 // i.e. if Dial is blank. 533 // 534 // By default client connects only to ipv4 addresses, 535 // since unfortunately ipv6 remains broken in many networks worldwide :) 536 DialDualStack bool 537 538 // Whether to use TLS (aka SSL or HTTPS) for host connections. 539 IsTLS bool 540 541 // Optional TLS config. 542 TLSConfig *tls.Config 543 544 // Maximum number of connections which may be established to all hosts 545 // listed in Addr. 546 // 547 // You can change this value while the HostClient is being used 548 // using HostClient.SetMaxConns(value) 549 // 550 // DefaultMaxConnsPerHost is used if not set. 551 MaxConns int 552 553 // Keep-alive connections are closed after this duration. 554 // 555 // By default connection duration is unlimited. 556 MaxConnDuration time.Duration 557 558 // Idle keep-alive connections are closed after this duration. 559 // 560 // By default idle connections are closed 561 // after DefaultMaxIdleConnDuration. 562 MaxIdleConnDuration time.Duration 563 564 // Maximum number of attempts for idempotent calls 565 // 566 // DefaultMaxIdemponentCallAttempts is used if not set. 567 MaxIdemponentCallAttempts int 568 569 // Per-connection buffer size for responses' reading. 570 // This also limits the maximum header size. 571 // 572 // Default buffer size is used if 0. 573 ReadBufferSize int 574 575 // Per-connection buffer size for requests' writing. 576 // 577 // Default buffer size is used if 0. 578 WriteBufferSize int 579 580 // Maximum duration for full response reading (including body). 581 // 582 // By default response read timeout is unlimited. 583 ReadTimeout time.Duration 584 585 // Maximum duration for full request writing (including body). 586 // 587 // By default request write timeout is unlimited. 588 WriteTimeout time.Duration 589 590 // Maximum response body size. 591 // 592 // The client returns ErrBodyTooLarge if this limit is greater than 0 593 // and response body is greater than the limit. 594 // 595 // By default response body size is unlimited. 596 MaxResponseBodySize int 597 598 // Header names are passed as-is without normalization 599 // if this option is set. 600 // 601 // Disabled header names' normalization may be useful only for proxying 602 // responses to other clients expecting case-sensitive 603 // header names. See https://github.com/valyala/fasthttp/issues/57 604 // for details. 605 // 606 // By default request and response header names are normalized, i.e. 607 // The first letter and the first letters following dashes 608 // are uppercased, while all the other letters are lowercased. 609 // Examples: 610 // 611 // * HOST -> Host 612 // * content-type -> Content-Type 613 // * cONTENT-lenGTH -> Content-Length 614 DisableHeaderNamesNormalizing bool 615 616 clientName atomic.Value 617 lastUseTime uint32 618 619 connsLock sync.Mutex 620 connsCount int 621 conns []*clientConn 622 623 addrsLock sync.Mutex 624 addrs []string 625 addrIdx uint32 626 627 tlsConfigMap map[string]*tls.Config 628 tlsConfigMapLock sync.Mutex 629 630 readerPool sync.Pool 631 writerPool sync.Pool 632 633 pendingRequests int32 634 635 connsCleanerRun bool 636} 637 638type clientConn struct { 639 c net.Conn 640 641 createdTime time.Time 642 lastUseTime time.Time 643} 644 645var startTimeUnix = time.Now().Unix() 646 647// LastUseTime returns time the client was last used 648func (c *HostClient) LastUseTime() time.Time { 649 n := atomic.LoadUint32(&c.lastUseTime) 650 return time.Unix(startTimeUnix+int64(n), 0) 651} 652 653// Get returns the status code and body of url. 654// 655// The contents of dst will be replaced by the body and returned, if the dst 656// is too small a new slice will be allocated. 657// 658// The function follows redirects. Use Do* for manually handling redirects. 659func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) { 660 return clientGetURL(dst, url, c) 661} 662 663// GetTimeout returns the status code and body of url. 664// 665// The contents of dst will be replaced by the body and returned, if the dst 666// is too small a new slice will be allocated. 667// 668// The function follows redirects. Use Do* for manually handling redirects. 669// 670// ErrTimeout error is returned if url contents couldn't be fetched 671// during the given timeout. 672func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) { 673 return clientGetURLTimeout(dst, url, timeout, c) 674} 675 676// GetDeadline returns the status code and body of url. 677// 678// The contents of dst will be replaced by the body and returned, if the dst 679// is too small a new slice will be allocated. 680// 681// The function follows redirects. Use Do* for manually handling redirects. 682// 683// ErrTimeout error is returned if url contents couldn't be fetched 684// until the given deadline. 685func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) { 686 return clientGetURLDeadline(dst, url, deadline, c) 687} 688 689// Post sends POST request to the given url with the given POST arguments. 690// 691// The contents of dst will be replaced by the body and returned, if the dst 692// is too small a new slice will be allocated. 693// 694// The function follows redirects. Use Do* for manually handling redirects. 695// 696// Empty POST body is sent if postArgs is nil. 697func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) { 698 return clientPostURL(dst, url, postArgs, c) 699} 700 701type clientDoer interface { 702 Do(req *Request, resp *Response) error 703} 704 705func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) { 706 req := AcquireRequest() 707 708 statusCode, body, err = doRequestFollowRedirects(req, dst, url, c) 709 710 ReleaseRequest(req) 711 return statusCode, body, err 712} 713 714func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) { 715 deadline := time.Now().Add(timeout) 716 return clientGetURLDeadline(dst, url, deadline, c) 717} 718 719type clientURLResponse struct { 720 statusCode int 721 body []byte 722 err error 723} 724 725func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) { 726 timeout := -time.Since(deadline) 727 if timeout <= 0 { 728 return 0, dst, ErrTimeout 729 } 730 731 var ch chan clientURLResponse 732 chv := clientURLResponseChPool.Get() 733 if chv == nil { 734 chv = make(chan clientURLResponse, 1) 735 } 736 ch = chv.(chan clientURLResponse) 737 738 req := AcquireRequest() 739 740 // Note that the request continues execution on ErrTimeout until 741 // client-specific ReadTimeout exceeds. This helps limiting load 742 // on slow hosts by MaxConns* concurrent requests. 743 // 744 // Without this 'hack' the load on slow host could exceed MaxConns* 745 // concurrent requests, since timed out requests on client side 746 // usually continue execution on the host. 747 go func() { 748 statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirects(req, dst, url, c) 749 ch <- clientURLResponse{ 750 statusCode: statusCodeCopy, 751 body: bodyCopy, 752 err: errCopy, 753 } 754 }() 755 756 tc := AcquireTimer(timeout) 757 select { 758 case resp := <-ch: 759 ReleaseRequest(req) 760 clientURLResponseChPool.Put(chv) 761 statusCode = resp.statusCode 762 body = resp.body 763 err = resp.err 764 case <-tc.C: 765 body = dst 766 err = ErrTimeout 767 } 768 ReleaseTimer(tc) 769 770 return statusCode, body, err 771} 772 773var clientURLResponseChPool sync.Pool 774 775func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) { 776 req := AcquireRequest() 777 req.Header.SetMethodBytes(strPost) 778 req.Header.SetContentTypeBytes(strPostArgsContentType) 779 if postArgs != nil { 780 postArgs.WriteTo(req.BodyWriter()) 781 } 782 783 statusCode, body, err = doRequestFollowRedirects(req, dst, url, c) 784 785 ReleaseRequest(req) 786 return statusCode, body, err 787} 788 789var ( 790 errMissingLocation = errors.New("missing Location header for http redirect") 791 errTooManyRedirects = errors.New("too many redirects detected when doing the request") 792) 793 794const maxRedirectsCount = 16 795 796func doRequestFollowRedirects(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) { 797 resp := AcquireResponse() 798 bodyBuf := resp.bodyBuffer() 799 resp.keepBodyBuffer = true 800 oldBody := bodyBuf.B 801 bodyBuf.B = dst 802 scheme := req.uri.Scheme() 803 req.schemaUpdate = false 804 805 redirectsCount := 0 806 for { 807 // In case redirect to different scheme 808 if redirectsCount > 0 && !bytes.Equal(scheme, req.uri.Scheme()) { 809 if strings.HasPrefix(url, string(strHTTPS)) { 810 req.isTLS = true 811 req.uri.SetSchemeBytes(strHTTPS) 812 } else { 813 req.isTLS = false 814 req.uri.SetSchemeBytes(strHTTP) 815 } 816 scheme = req.uri.Scheme() 817 req.schemaUpdate = true 818 } 819 820 req.parsedURI = false 821 req.Header.host = req.Header.host[:0] 822 req.SetRequestURI(url) 823 824 if err = c.Do(req, resp); err != nil { 825 break 826 } 827 statusCode = resp.Header.StatusCode() 828 if statusCode != StatusMovedPermanently && 829 statusCode != StatusFound && 830 statusCode != StatusSeeOther && 831 statusCode != StatusTemporaryRedirect && 832 statusCode != StatusPermanentRedirect { 833 break 834 } 835 836 redirectsCount++ 837 if redirectsCount > maxRedirectsCount { 838 err = errTooManyRedirects 839 break 840 } 841 location := resp.Header.peek(strLocation) 842 if len(location) == 0 { 843 err = errMissingLocation 844 break 845 } 846 url = getRedirectURL(url, location) 847 } 848 849 body = bodyBuf.B 850 bodyBuf.B = oldBody 851 resp.keepBodyBuffer = false 852 ReleaseResponse(resp) 853 854 return statusCode, body, err 855} 856 857func getRedirectURL(baseURL string, location []byte) string { 858 u := AcquireURI() 859 u.Update(baseURL) 860 u.UpdateBytes(location) 861 redirectURL := u.String() 862 ReleaseURI(u) 863 return redirectURL 864} 865 866var ( 867 requestPool sync.Pool 868 responsePool sync.Pool 869) 870 871// AcquireRequest returns an empty Request instance from request pool. 872// 873// The returned Request instance may be passed to ReleaseRequest when it is 874// no longer needed. This allows Request recycling, reduces GC pressure 875// and usually improves performance. 876func AcquireRequest() *Request { 877 v := requestPool.Get() 878 if v == nil { 879 return &Request{} 880 } 881 return v.(*Request) 882} 883 884// ReleaseRequest returns req acquired via AcquireRequest to request pool. 885// 886// It is forbidden accessing req and/or its' members after returning 887// it to request pool. 888func ReleaseRequest(req *Request) { 889 req.Reset() 890 requestPool.Put(req) 891} 892 893// AcquireResponse returns an empty Response instance from response pool. 894// 895// The returned Response instance may be passed to ReleaseResponse when it is 896// no longer needed. This allows Response recycling, reduces GC pressure 897// and usually improves performance. 898func AcquireResponse() *Response { 899 v := responsePool.Get() 900 if v == nil { 901 return &Response{} 902 } 903 return v.(*Response) 904} 905 906// ReleaseResponse return resp acquired via AcquireResponse to response pool. 907// 908// It is forbidden accessing resp and/or its' members after returning 909// it to response pool. 910func ReleaseResponse(resp *Response) { 911 resp.Reset() 912 responsePool.Put(resp) 913} 914 915// DoTimeout performs the given request and waits for response during 916// the given timeout duration. 917// 918// Request must contain at least non-zero RequestURI with full url (including 919// scheme and host) or non-zero Host header + RequestURI. 920// 921// The function doesn't follow redirects. Use Get* for following redirects. 922// 923// Response is ignored if resp is nil. 924// 925// ErrTimeout is returned if the response wasn't returned during 926// the given timeout. 927// 928// ErrNoFreeConns is returned if all HostClient.MaxConns connections 929// to the host are busy. 930// 931// It is recommended obtaining req and resp via AcquireRequest 932// and AcquireResponse in performance-critical code. 933// 934// Warning: DoTimeout does not terminate the request itself. The request will 935// continue in the background and the response will be discarded. 936// If requests take too long and the connection pool gets filled up please 937// try setting a ReadTimeout. 938func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error { 939 return clientDoTimeout(req, resp, timeout, c) 940} 941 942// DoDeadline performs the given request and waits for response until 943// the given deadline. 944// 945// Request must contain at least non-zero RequestURI with full url (including 946// scheme and host) or non-zero Host header + RequestURI. 947// 948// The function doesn't follow redirects. Use Get* for following redirects. 949// 950// Response is ignored if resp is nil. 951// 952// ErrTimeout is returned if the response wasn't returned until 953// the given deadline. 954// 955// ErrNoFreeConns is returned if all HostClient.MaxConns connections 956// to the host are busy. 957// 958// It is recommended obtaining req and resp via AcquireRequest 959// and AcquireResponse in performance-critical code. 960func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error { 961 return clientDoDeadline(req, resp, deadline, c) 962} 963 964func clientDoTimeout(req *Request, resp *Response, timeout time.Duration, c clientDoer) error { 965 deadline := time.Now().Add(timeout) 966 return clientDoDeadline(req, resp, deadline, c) 967} 968 969func clientDoDeadline(req *Request, resp *Response, deadline time.Time, c clientDoer) error { 970 timeout := -time.Since(deadline) 971 if timeout <= 0 { 972 return ErrTimeout 973 } 974 975 var ch chan error 976 chv := errorChPool.Get() 977 if chv == nil { 978 chv = make(chan error, 1) 979 } 980 ch = chv.(chan error) 981 982 // Make req and resp copies, since on timeout they no longer 983 // may be accessed. 984 reqCopy := AcquireRequest() 985 req.copyToSkipBody(reqCopy) 986 swapRequestBody(req, reqCopy) 987 respCopy := AcquireResponse() 988 // Not calling resp.copyToSkipBody(respCopy) here to avoid 989 // unexpected messing with headers 990 respCopy.SkipBody = resp.SkipBody 991 992 // Note that the request continues execution on ErrTimeout until 993 // client-specific ReadTimeout exceeds. This helps limiting load 994 // on slow hosts by MaxConns* concurrent requests. 995 // 996 // Without this 'hack' the load on slow host could exceed MaxConns* 997 // concurrent requests, since timed out requests on client side 998 // usually continue execution on the host. 999 1000 var cleanup int32 1001 go func() { 1002 errDo := c.Do(reqCopy, respCopy) 1003 if atomic.LoadInt32(&cleanup) == 1 { 1004 ReleaseResponse(respCopy) 1005 ReleaseRequest(reqCopy) 1006 errorChPool.Put(chv) 1007 } else { 1008 ch <- errDo 1009 } 1010 }() 1011 1012 tc := AcquireTimer(timeout) 1013 var err error 1014 select { 1015 case err = <-ch: 1016 if resp != nil { 1017 respCopy.copyToSkipBody(resp) 1018 swapResponseBody(resp, respCopy) 1019 } 1020 swapRequestBody(reqCopy, req) 1021 ReleaseResponse(respCopy) 1022 ReleaseRequest(reqCopy) 1023 errorChPool.Put(chv) 1024 case <-tc.C: 1025 atomic.StoreInt32(&cleanup, 1) 1026 err = ErrTimeout 1027 } 1028 ReleaseTimer(tc) 1029 1030 return err 1031} 1032 1033var errorChPool sync.Pool 1034 1035// Do performs the given http request and sets the corresponding response. 1036// 1037// Request must contain at least non-zero RequestURI with full url (including 1038// scheme and host) or non-zero Host header + RequestURI. 1039// 1040// The function doesn't follow redirects. Use Get* for following redirects. 1041// 1042// Response is ignored if resp is nil. 1043// 1044// ErrNoFreeConns is returned if all HostClient.MaxConns connections 1045// to the host are busy. 1046// 1047// It is recommended obtaining req and resp via AcquireRequest 1048// and AcquireResponse in performance-critical code. 1049func (c *HostClient) Do(req *Request, resp *Response) error { 1050 var err error 1051 var retry bool 1052 maxAttempts := c.MaxIdemponentCallAttempts 1053 if maxAttempts <= 0 { 1054 maxAttempts = DefaultMaxIdemponentCallAttempts 1055 } 1056 attempts := 0 1057 1058 atomic.AddInt32(&c.pendingRequests, 1) 1059 for { 1060 retry, err = c.do(req, resp) 1061 if err == nil || !retry { 1062 break 1063 } 1064 1065 if !isIdempotent(req) { 1066 // Retry non-idempotent requests if the server closes 1067 // the connection before sending the response. 1068 // 1069 // This case is possible if the server closes the idle 1070 // keep-alive connection on timeout. 1071 // 1072 // Apache and nginx usually do this. 1073 if err != io.EOF { 1074 break 1075 } 1076 } 1077 attempts++ 1078 if attempts >= maxAttempts { 1079 break 1080 } 1081 } 1082 atomic.AddInt32(&c.pendingRequests, -1) 1083 1084 if err == io.EOF { 1085 err = ErrConnectionClosed 1086 } 1087 return err 1088} 1089 1090// PendingRequests returns the current number of requests the client 1091// is executing. 1092// 1093// This function may be used for balancing load among multiple HostClient 1094// instances. 1095func (c *HostClient) PendingRequests() int { 1096 return int(atomic.LoadInt32(&c.pendingRequests)) 1097} 1098 1099func isIdempotent(req *Request) bool { 1100 return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut() 1101} 1102 1103func (c *HostClient) do(req *Request, resp *Response) (bool, error) { 1104 nilResp := false 1105 if resp == nil { 1106 nilResp = true 1107 resp = AcquireResponse() 1108 } 1109 1110 ok, err := c.doNonNilReqResp(req, resp) 1111 1112 if nilResp { 1113 ReleaseResponse(resp) 1114 } 1115 1116 return ok, err 1117} 1118 1119func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) { 1120 if req == nil { 1121 panic("BUG: req cannot be nil") 1122 } 1123 if resp == nil { 1124 panic("BUG: resp cannot be nil") 1125 } 1126 1127 atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix)) 1128 1129 // Free up resources occupied by response before sending the request, 1130 // so the GC may reclaim these resources (e.g. response body). 1131 resp.Reset() 1132 1133 // If we detected a redirect to another schema 1134 if req.schemaUpdate { 1135 c.IsTLS = bytes.Equal(req.URI().Scheme(), strHTTPS) 1136 c.Addr = addMissingPort(string(req.Host()), c.IsTLS) 1137 c.addrIdx = 0 1138 c.addrs = nil 1139 req.schemaUpdate = false 1140 req.SetConnectionClose() 1141 } 1142 1143 cc, err := c.acquireConn() 1144 if err != nil { 1145 return false, err 1146 } 1147 conn := cc.c 1148 1149 resp.parseNetConn(conn) 1150 1151 if c.WriteTimeout > 0 { 1152 // Set Deadline every time, since golang has fixed the performance issue 1153 // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details 1154 currentTime := time.Now() 1155 if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil { 1156 c.closeConn(cc) 1157 return true, err 1158 } 1159 } 1160 1161 resetConnection := false 1162 if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() { 1163 req.SetConnectionClose() 1164 resetConnection = true 1165 } 1166 1167 userAgentOld := req.Header.UserAgent() 1168 if len(userAgentOld) == 0 { 1169 req.Header.userAgent = c.getClientName() 1170 } 1171 bw := c.acquireWriter(conn) 1172 err = req.Write(bw) 1173 1174 if resetConnection { 1175 req.Header.ResetConnectionClose() 1176 } 1177 1178 if err == nil { 1179 err = bw.Flush() 1180 } 1181 if err != nil { 1182 c.releaseWriter(bw) 1183 c.closeConn(cc) 1184 return true, err 1185 } 1186 c.releaseWriter(bw) 1187 1188 if c.ReadTimeout > 0 { 1189 // Set Deadline every time, since golang has fixed the performance issue 1190 // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details 1191 currentTime := time.Now() 1192 if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil { 1193 c.closeConn(cc) 1194 return true, err 1195 } 1196 } 1197 1198 if !req.Header.IsGet() && req.Header.IsHead() { 1199 resp.SkipBody = true 1200 } 1201 if c.DisableHeaderNamesNormalizing { 1202 resp.Header.DisableNormalizing() 1203 } 1204 1205 br := c.acquireReader(conn) 1206 if err = resp.ReadLimitBody(br, c.MaxResponseBodySize); err != nil { 1207 c.releaseReader(br) 1208 c.closeConn(cc) 1209 // Don't retry in case of ErrBodyTooLarge since we will just get the same again. 1210 retry := err != ErrBodyTooLarge 1211 return retry, err 1212 } 1213 c.releaseReader(br) 1214 1215 if resetConnection || req.ConnectionClose() || resp.ConnectionClose() { 1216 c.closeConn(cc) 1217 } else { 1218 c.releaseConn(cc) 1219 } 1220 1221 return false, err 1222} 1223 1224var ( 1225 // ErrNoFreeConns is returned when no free connections available 1226 // to the given host. 1227 // 1228 // Increase the allowed number of connections per host if you 1229 // see this error. 1230 ErrNoFreeConns = errors.New("no free connections available to host") 1231 1232 // ErrTimeout is returned from timed out calls. 1233 ErrTimeout = errors.New("timeout") 1234 1235 // ErrConnectionClosed may be returned from client methods if the server 1236 // closes connection before returning the first response byte. 1237 // 1238 // If you see this error, then either fix the server by returning 1239 // 'Connection: close' response header before closing the connection 1240 // or add 'Connection: close' request header before sending requests 1241 // to broken server. 1242 ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " + 1243 "Make sure the server returns 'Connection: close' response header before closing the connection") 1244) 1245 1246func (c *HostClient) SetMaxConns(newMaxConns int) { 1247 c.connsLock.Lock() 1248 c.MaxConns = newMaxConns 1249 c.connsLock.Unlock() 1250} 1251 1252func (c *HostClient) acquireConn() (*clientConn, error) { 1253 var cc *clientConn 1254 createConn := false 1255 startCleaner := false 1256 1257 var n int 1258 c.connsLock.Lock() 1259 n = len(c.conns) 1260 if n == 0 { 1261 maxConns := c.MaxConns 1262 if maxConns <= 0 { 1263 maxConns = DefaultMaxConnsPerHost 1264 } 1265 if c.connsCount < maxConns { 1266 c.connsCount++ 1267 createConn = true 1268 if !c.connsCleanerRun { 1269 startCleaner = true 1270 c.connsCleanerRun = true 1271 } 1272 } 1273 } else { 1274 n-- 1275 cc = c.conns[n] 1276 c.conns[n] = nil 1277 c.conns = c.conns[:n] 1278 } 1279 c.connsLock.Unlock() 1280 1281 if cc != nil { 1282 return cc, nil 1283 } 1284 if !createConn { 1285 return nil, ErrNoFreeConns 1286 } 1287 1288 if startCleaner { 1289 go c.connsCleaner() 1290 } 1291 1292 conn, err := c.dialHostHard() 1293 if err != nil { 1294 c.decConnsCount() 1295 return nil, err 1296 } 1297 cc = acquireClientConn(conn) 1298 1299 return cc, nil 1300} 1301 1302func (c *HostClient) connsCleaner() { 1303 var ( 1304 scratch []*clientConn 1305 maxIdleConnDuration = c.MaxIdleConnDuration 1306 ) 1307 if maxIdleConnDuration <= 0 { 1308 maxIdleConnDuration = DefaultMaxIdleConnDuration 1309 } 1310 for { 1311 currentTime := time.Now() 1312 1313 // Determine idle connections to be closed. 1314 c.connsLock.Lock() 1315 conns := c.conns 1316 n := len(conns) 1317 i := 0 1318 for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration { 1319 i++ 1320 } 1321 sleepFor := maxIdleConnDuration 1322 if i < n { 1323 // + 1 so we actually sleep past the expiration time and not up to it. 1324 // Otherwise the > check above would still fail. 1325 sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1 1326 } 1327 scratch = append(scratch[:0], conns[:i]...) 1328 if i > 0 { 1329 m := copy(conns, conns[i:]) 1330 for i = m; i < n; i++ { 1331 conns[i] = nil 1332 } 1333 c.conns = conns[:m] 1334 } 1335 c.connsLock.Unlock() 1336 1337 // Close idle connections. 1338 for i, cc := range scratch { 1339 c.closeConn(cc) 1340 scratch[i] = nil 1341 } 1342 1343 // Determine whether to stop the connsCleaner. 1344 c.connsLock.Lock() 1345 mustStop := c.connsCount == 0 1346 if mustStop { 1347 c.connsCleanerRun = false 1348 } 1349 c.connsLock.Unlock() 1350 if mustStop { 1351 break 1352 } 1353 1354 time.Sleep(sleepFor) 1355 } 1356} 1357 1358func (c *HostClient) closeConn(cc *clientConn) { 1359 c.decConnsCount() 1360 cc.c.Close() 1361 releaseClientConn(cc) 1362} 1363 1364func (c *HostClient) decConnsCount() { 1365 c.connsLock.Lock() 1366 c.connsCount-- 1367 c.connsLock.Unlock() 1368} 1369 1370func acquireClientConn(conn net.Conn) *clientConn { 1371 v := clientConnPool.Get() 1372 if v == nil { 1373 v = &clientConn{} 1374 } 1375 cc := v.(*clientConn) 1376 cc.c = conn 1377 cc.createdTime = time.Now() 1378 return cc 1379} 1380 1381func releaseClientConn(cc *clientConn) { 1382 // Reset all fields. 1383 *cc = clientConn{} 1384 clientConnPool.Put(cc) 1385} 1386 1387var clientConnPool sync.Pool 1388 1389func (c *HostClient) releaseConn(cc *clientConn) { 1390 cc.lastUseTime = time.Now() 1391 c.connsLock.Lock() 1392 c.conns = append(c.conns, cc) 1393 c.connsLock.Unlock() 1394} 1395 1396func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer { 1397 v := c.writerPool.Get() 1398 if v == nil { 1399 n := c.WriteBufferSize 1400 if n <= 0 { 1401 n = defaultWriteBufferSize 1402 } 1403 return bufio.NewWriterSize(conn, n) 1404 } 1405 bw := v.(*bufio.Writer) 1406 bw.Reset(conn) 1407 return bw 1408} 1409 1410func (c *HostClient) releaseWriter(bw *bufio.Writer) { 1411 c.writerPool.Put(bw) 1412} 1413 1414func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader { 1415 v := c.readerPool.Get() 1416 if v == nil { 1417 n := c.ReadBufferSize 1418 if n <= 0 { 1419 n = defaultReadBufferSize 1420 } 1421 return bufio.NewReaderSize(conn, n) 1422 } 1423 br := v.(*bufio.Reader) 1424 br.Reset(conn) 1425 return br 1426} 1427 1428func (c *HostClient) releaseReader(br *bufio.Reader) { 1429 c.readerPool.Put(br) 1430} 1431 1432func newClientTLSConfig(c *tls.Config, addr string) *tls.Config { 1433 if c == nil { 1434 c = &tls.Config{} 1435 } else { 1436 // TODO: substitute this with c.Clone() after go1.8 becomes mainstream :) 1437 c = &tls.Config{ 1438 Rand: c.Rand, 1439 Time: c.Time, 1440 Certificates: c.Certificates, 1441 NameToCertificate: c.NameToCertificate, 1442 GetCertificate: c.GetCertificate, 1443 RootCAs: c.RootCAs, 1444 NextProtos: c.NextProtos, 1445 ServerName: c.ServerName, 1446 1447 // Do not copy ClientAuth, since it is server-related stuff 1448 // Do not copy ClientCAs, since it is server-related stuff 1449 1450 InsecureSkipVerify: c.InsecureSkipVerify, 1451 CipherSuites: c.CipherSuites, 1452 1453 // Do not copy PreferServerCipherSuites - this is server stuff 1454 1455 SessionTicketsDisabled: c.SessionTicketsDisabled, 1456 1457 // Do not copy SessionTicketKey - this is server stuff 1458 1459 ClientSessionCache: c.ClientSessionCache, 1460 MinVersion: c.MinVersion, 1461 MaxVersion: c.MaxVersion, 1462 CurvePreferences: c.CurvePreferences, 1463 } 1464 } 1465 1466 if c.ClientSessionCache == nil { 1467 c.ClientSessionCache = tls.NewLRUClientSessionCache(0) 1468 } 1469 1470 if len(c.ServerName) == 0 { 1471 serverName := tlsServerName(addr) 1472 if serverName == "*" { 1473 c.InsecureSkipVerify = true 1474 } else { 1475 c.ServerName = serverName 1476 } 1477 } 1478 return c 1479} 1480 1481func tlsServerName(addr string) string { 1482 if !strings.Contains(addr, ":") { 1483 return addr 1484 } 1485 host, _, err := net.SplitHostPort(addr) 1486 if err != nil { 1487 return "*" 1488 } 1489 return host 1490} 1491 1492func (c *HostClient) nextAddr() string { 1493 c.addrsLock.Lock() 1494 if c.addrs == nil { 1495 c.addrs = strings.Split(c.Addr, ",") 1496 } 1497 addr := c.addrs[0] 1498 if len(c.addrs) > 1 { 1499 addr = c.addrs[c.addrIdx%uint32(len(c.addrs))] 1500 c.addrIdx++ 1501 } 1502 c.addrsLock.Unlock() 1503 return addr 1504} 1505 1506func (c *HostClient) dialHostHard() (conn net.Conn, err error) { 1507 // attempt to dial all the available hosts before giving up. 1508 1509 c.addrsLock.Lock() 1510 n := len(c.addrs) 1511 c.addrsLock.Unlock() 1512 1513 if n == 0 { 1514 // It looks like c.addrs isn't initialized yet. 1515 n = 1 1516 } 1517 1518 timeout := c.ReadTimeout + c.WriteTimeout 1519 if timeout <= 0 { 1520 timeout = DefaultDialTimeout 1521 } 1522 deadline := time.Now().Add(timeout) 1523 for n > 0 { 1524 addr := c.nextAddr() 1525 tlsConfig := c.cachedTLSConfig(addr) 1526 conn, err = dialAddr(addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig) 1527 if err == nil { 1528 return conn, nil 1529 } 1530 if time.Since(deadline) >= 0 { 1531 break 1532 } 1533 n-- 1534 } 1535 return nil, err 1536} 1537 1538func (c *HostClient) cachedTLSConfig(addr string) *tls.Config { 1539 if !c.IsTLS { 1540 return nil 1541 } 1542 1543 c.tlsConfigMapLock.Lock() 1544 if c.tlsConfigMap == nil { 1545 c.tlsConfigMap = make(map[string]*tls.Config) 1546 } 1547 cfg := c.tlsConfigMap[addr] 1548 if cfg == nil { 1549 cfg = newClientTLSConfig(c.TLSConfig, addr) 1550 c.tlsConfigMap[addr] = cfg 1551 } 1552 c.tlsConfigMapLock.Unlock() 1553 1554 return cfg 1555} 1556 1557func dialAddr(addr string, dial DialFunc, dialDualStack, isTLS bool, tlsConfig *tls.Config) (net.Conn, error) { 1558 if dial == nil { 1559 if dialDualStack { 1560 dial = DialDualStack 1561 } else { 1562 dial = Dial 1563 } 1564 addr = addMissingPort(addr, isTLS) 1565 } 1566 conn, err := dial(addr) 1567 if err != nil { 1568 return nil, err 1569 } 1570 if conn == nil { 1571 panic("BUG: DialFunc returned (nil, nil)") 1572 } 1573 if isTLS { 1574 conn = tls.Client(conn, tlsConfig) 1575 } 1576 return conn, nil 1577} 1578 1579func (c *HostClient) getClientName() []byte { 1580 v := c.clientName.Load() 1581 var clientName []byte 1582 if v == nil { 1583 clientName = []byte(c.Name) 1584 if len(clientName) == 0 && !c.NoDefaultUserAgentHeader { 1585 clientName = defaultUserAgent 1586 } 1587 c.clientName.Store(clientName) 1588 } else { 1589 clientName = v.([]byte) 1590 } 1591 return clientName 1592} 1593 1594func addMissingPort(addr string, isTLS bool) string { 1595 n := strings.Index(addr, ":") 1596 if n >= 0 { 1597 return addr 1598 } 1599 port := 80 1600 if isTLS { 1601 port = 443 1602 } 1603 return fmt.Sprintf("%s:%d", addr, port) 1604} 1605 1606// PipelineClient pipelines requests over a limited set of concurrent 1607// connections to the given Addr. 1608// 1609// This client may be used in highly loaded HTTP-based RPC systems for reducing 1610// context switches and network level overhead. 1611// See https://en.wikipedia.org/wiki/HTTP_pipelining for details. 1612// 1613// It is forbidden copying PipelineClient instances. Create new instances 1614// instead. 1615// 1616// It is safe calling PipelineClient methods from concurrently running 1617// goroutines. 1618type PipelineClient struct { 1619 noCopy noCopy 1620 1621 // Address of the host to connect to. 1622 Addr string 1623 1624 // The maximum number of concurrent connections to the Addr. 1625 // 1626 // A single connection is used by default. 1627 MaxConns int 1628 1629 // The maximum number of pending pipelined requests over 1630 // a single connection to Addr. 1631 // 1632 // DefaultMaxPendingRequests is used by default. 1633 MaxPendingRequests int 1634 1635 // The maximum delay before sending pipelined requests as a batch 1636 // to the server. 1637 // 1638 // By default requests are sent immediately to the server. 1639 MaxBatchDelay time.Duration 1640 1641 // Callback for connection establishing to the host. 1642 // 1643 // Default Dial is used if not set. 1644 Dial DialFunc 1645 1646 // Attempt to connect to both ipv4 and ipv6 host addresses 1647 // if set to true. 1648 // 1649 // This option is used only if default TCP dialer is used, 1650 // i.e. if Dial is blank. 1651 // 1652 // By default client connects only to ipv4 addresses, 1653 // since unfortunately ipv6 remains broken in many networks worldwide :) 1654 DialDualStack bool 1655 1656 // Whether to use TLS (aka SSL or HTTPS) for host connections. 1657 IsTLS bool 1658 1659 // Optional TLS config. 1660 TLSConfig *tls.Config 1661 1662 // Idle connection to the host is closed after this duration. 1663 // 1664 // By default idle connection is closed after 1665 // DefaultMaxIdleConnDuration. 1666 MaxIdleConnDuration time.Duration 1667 1668 // Buffer size for responses' reading. 1669 // This also limits the maximum header size. 1670 // 1671 // Default buffer size is used if 0. 1672 ReadBufferSize int 1673 1674 // Buffer size for requests' writing. 1675 // 1676 // Default buffer size is used if 0. 1677 WriteBufferSize int 1678 1679 // Maximum duration for full response reading (including body). 1680 // 1681 // By default response read timeout is unlimited. 1682 ReadTimeout time.Duration 1683 1684 // Maximum duration for full request writing (including body). 1685 // 1686 // By default request write timeout is unlimited. 1687 WriteTimeout time.Duration 1688 1689 // Logger for logging client errors. 1690 // 1691 // By default standard logger from log package is used. 1692 Logger Logger 1693 1694 connClients []*pipelineConnClient 1695 connClientsLock sync.Mutex 1696} 1697 1698type pipelineConnClient struct { 1699 noCopy noCopy 1700 1701 Addr string 1702 MaxPendingRequests int 1703 MaxBatchDelay time.Duration 1704 Dial DialFunc 1705 DialDualStack bool 1706 IsTLS bool 1707 TLSConfig *tls.Config 1708 MaxIdleConnDuration time.Duration 1709 ReadBufferSize int 1710 WriteBufferSize int 1711 ReadTimeout time.Duration 1712 WriteTimeout time.Duration 1713 Logger Logger 1714 1715 workPool sync.Pool 1716 1717 chLock sync.Mutex 1718 chW chan *pipelineWork 1719 chR chan *pipelineWork 1720 1721 tlsConfigLock sync.Mutex 1722 tlsConfig *tls.Config 1723} 1724 1725type pipelineWork struct { 1726 reqCopy Request 1727 respCopy Response 1728 req *Request 1729 resp *Response 1730 t *time.Timer 1731 deadline time.Time 1732 err error 1733 done chan struct{} 1734} 1735 1736// DoTimeout performs the given request and waits for response during 1737// the given timeout duration. 1738// 1739// Request must contain at least non-zero RequestURI with full url (including 1740// scheme and host) or non-zero Host header + RequestURI. 1741// 1742// The function doesn't follow redirects. 1743// 1744// Response is ignored if resp is nil. 1745// 1746// ErrTimeout is returned if the response wasn't returned during 1747// the given timeout. 1748// 1749// It is recommended obtaining req and resp via AcquireRequest 1750// and AcquireResponse in performance-critical code. 1751// 1752// Warning: DoTimeout does not terminate the request itself. The request will 1753// continue in the background and the response will be discarded. 1754// If requests take too long and the connection pool gets filled up please 1755// try setting a ReadTimeout. 1756func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error { 1757 return c.DoDeadline(req, resp, time.Now().Add(timeout)) 1758} 1759 1760// DoDeadline performs the given request and waits for response until 1761// the given deadline. 1762// 1763// Request must contain at least non-zero RequestURI with full url (including 1764// scheme and host) or non-zero Host header + RequestURI. 1765// 1766// The function doesn't follow redirects. 1767// 1768// Response is ignored if resp is nil. 1769// 1770// ErrTimeout is returned if the response wasn't returned until 1771// the given deadline. 1772// 1773// It is recommended obtaining req and resp via AcquireRequest 1774// and AcquireResponse in performance-critical code. 1775func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error { 1776 return c.getConnClient().DoDeadline(req, resp, deadline) 1777} 1778 1779func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error { 1780 c.init() 1781 1782 timeout := -time.Since(deadline) 1783 if timeout < 0 { 1784 return ErrTimeout 1785 } 1786 1787 w := acquirePipelineWork(&c.workPool, timeout) 1788 w.req = &w.reqCopy 1789 w.resp = &w.respCopy 1790 1791 // Make a copy of the request in order to avoid data races on timeouts 1792 req.copyToSkipBody(&w.reqCopy) 1793 swapRequestBody(req, &w.reqCopy) 1794 1795 // Put the request to outgoing queue 1796 select { 1797 case c.chW <- w: 1798 // Fast path: len(c.ch) < cap(c.ch) 1799 default: 1800 // Slow path 1801 select { 1802 case c.chW <- w: 1803 case <-w.t.C: 1804 releasePipelineWork(&c.workPool, w) 1805 return ErrTimeout 1806 } 1807 } 1808 1809 // Wait for the response 1810 var err error 1811 select { 1812 case <-w.done: 1813 if resp != nil { 1814 w.respCopy.copyToSkipBody(resp) 1815 swapResponseBody(resp, &w.respCopy) 1816 } 1817 err = w.err 1818 releasePipelineWork(&c.workPool, w) 1819 case <-w.t.C: 1820 err = ErrTimeout 1821 } 1822 1823 return err 1824} 1825 1826// Do performs the given http request and sets the corresponding response. 1827// 1828// Request must contain at least non-zero RequestURI with full url (including 1829// scheme and host) or non-zero Host header + RequestURI. 1830// 1831// The function doesn't follow redirects. Use Get* for following redirects. 1832// 1833// Response is ignored if resp is nil. 1834// 1835// It is recommended obtaining req and resp via AcquireRequest 1836// and AcquireResponse in performance-critical code. 1837func (c *PipelineClient) Do(req *Request, resp *Response) error { 1838 return c.getConnClient().Do(req, resp) 1839} 1840 1841func (c *pipelineConnClient) Do(req *Request, resp *Response) error { 1842 c.init() 1843 1844 w := acquirePipelineWork(&c.workPool, 0) 1845 w.req = req 1846 if resp != nil { 1847 w.resp = resp 1848 } else { 1849 w.resp = &w.respCopy 1850 } 1851 1852 // Put the request to outgoing queue 1853 select { 1854 case c.chW <- w: 1855 default: 1856 // Try substituting the oldest w with the current one. 1857 select { 1858 case wOld := <-c.chW: 1859 wOld.err = ErrPipelineOverflow 1860 wOld.done <- struct{}{} 1861 default: 1862 } 1863 select { 1864 case c.chW <- w: 1865 default: 1866 releasePipelineWork(&c.workPool, w) 1867 return ErrPipelineOverflow 1868 } 1869 } 1870 1871 // Wait for the response 1872 <-w.done 1873 err := w.err 1874 1875 releasePipelineWork(&c.workPool, w) 1876 1877 return err 1878} 1879 1880func (c *PipelineClient) getConnClient() *pipelineConnClient { 1881 c.connClientsLock.Lock() 1882 cc := c.getConnClientUnlocked() 1883 c.connClientsLock.Unlock() 1884 return cc 1885} 1886 1887func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient { 1888 if len(c.connClients) == 0 { 1889 return c.newConnClient() 1890 } 1891 1892 // Return the client with the minimum number of pending requests. 1893 minCC := c.connClients[0] 1894 minReqs := minCC.PendingRequests() 1895 if minReqs == 0 { 1896 return minCC 1897 } 1898 for i := 1; i < len(c.connClients); i++ { 1899 cc := c.connClients[i] 1900 reqs := cc.PendingRequests() 1901 if reqs == 0 { 1902 return cc 1903 } 1904 if reqs < minReqs { 1905 minCC = cc 1906 minReqs = reqs 1907 } 1908 } 1909 1910 maxConns := c.MaxConns 1911 if maxConns <= 0 { 1912 maxConns = 1 1913 } 1914 if len(c.connClients) < maxConns { 1915 return c.newConnClient() 1916 } 1917 return minCC 1918} 1919 1920func (c *PipelineClient) newConnClient() *pipelineConnClient { 1921 cc := &pipelineConnClient{ 1922 Addr: c.Addr, 1923 MaxPendingRequests: c.MaxPendingRequests, 1924 MaxBatchDelay: c.MaxBatchDelay, 1925 Dial: c.Dial, 1926 DialDualStack: c.DialDualStack, 1927 IsTLS: c.IsTLS, 1928 TLSConfig: c.TLSConfig, 1929 MaxIdleConnDuration: c.MaxIdleConnDuration, 1930 ReadBufferSize: c.ReadBufferSize, 1931 WriteBufferSize: c.WriteBufferSize, 1932 ReadTimeout: c.ReadTimeout, 1933 WriteTimeout: c.WriteTimeout, 1934 Logger: c.Logger, 1935 } 1936 c.connClients = append(c.connClients, cc) 1937 return cc 1938} 1939 1940// ErrPipelineOverflow may be returned from PipelineClient.Do* 1941// if the requests' queue is overflown. 1942var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxConns and/or MaxPendingRequests") 1943 1944// DefaultMaxPendingRequests is the default value 1945// for PipelineClient.MaxPendingRequests. 1946const DefaultMaxPendingRequests = 1024 1947 1948func (c *pipelineConnClient) init() { 1949 c.chLock.Lock() 1950 if c.chR == nil { 1951 maxPendingRequests := c.MaxPendingRequests 1952 if maxPendingRequests <= 0 { 1953 maxPendingRequests = DefaultMaxPendingRequests 1954 } 1955 c.chR = make(chan *pipelineWork, maxPendingRequests) 1956 if c.chW == nil { 1957 c.chW = make(chan *pipelineWork, maxPendingRequests) 1958 } 1959 go func() { 1960 if err := c.worker(); err != nil { 1961 c.logger().Printf("error in PipelineClient(%q): %s", c.Addr, err) 1962 if netErr, ok := err.(net.Error); ok && netErr.Temporary() { 1963 // Throttle client reconnections on temporary errors 1964 time.Sleep(time.Second) 1965 } 1966 } 1967 1968 c.chLock.Lock() 1969 // Do not reset c.chW to nil, since it may contain 1970 // pending requests, which could be served on the next 1971 // connection to the host. 1972 c.chR = nil 1973 c.chLock.Unlock() 1974 }() 1975 } 1976 c.chLock.Unlock() 1977} 1978 1979func (c *pipelineConnClient) worker() error { 1980 tlsConfig := c.cachedTLSConfig() 1981 conn, err := dialAddr(c.Addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig) 1982 if err != nil { 1983 return err 1984 } 1985 1986 // Start reader and writer 1987 stopW := make(chan struct{}) 1988 doneW := make(chan error) 1989 go func() { 1990 doneW <- c.writer(conn, stopW) 1991 }() 1992 stopR := make(chan struct{}) 1993 doneR := make(chan error) 1994 go func() { 1995 doneR <- c.reader(conn, stopR) 1996 }() 1997 1998 // Wait until reader and writer are stopped 1999 select { 2000 case err = <-doneW: 2001 conn.Close() 2002 close(stopR) 2003 <-doneR 2004 case err = <-doneR: 2005 conn.Close() 2006 close(stopW) 2007 <-doneW 2008 } 2009 2010 // Notify pending readers 2011 for len(c.chR) > 0 { 2012 w := <-c.chR 2013 w.err = errPipelineConnStopped 2014 w.done <- struct{}{} 2015 } 2016 2017 return err 2018} 2019 2020func (c *pipelineConnClient) cachedTLSConfig() *tls.Config { 2021 if !c.IsTLS { 2022 return nil 2023 } 2024 2025 c.tlsConfigLock.Lock() 2026 cfg := c.tlsConfig 2027 if cfg == nil { 2028 cfg = newClientTLSConfig(c.TLSConfig, c.Addr) 2029 c.tlsConfig = cfg 2030 } 2031 c.tlsConfigLock.Unlock() 2032 2033 return cfg 2034} 2035 2036func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error { 2037 writeBufferSize := c.WriteBufferSize 2038 if writeBufferSize <= 0 { 2039 writeBufferSize = defaultWriteBufferSize 2040 } 2041 bw := bufio.NewWriterSize(conn, writeBufferSize) 2042 defer bw.Flush() 2043 chR := c.chR 2044 chW := c.chW 2045 writeTimeout := c.WriteTimeout 2046 2047 maxIdleConnDuration := c.MaxIdleConnDuration 2048 if maxIdleConnDuration <= 0 { 2049 maxIdleConnDuration = DefaultMaxIdleConnDuration 2050 } 2051 maxBatchDelay := c.MaxBatchDelay 2052 2053 var ( 2054 stopTimer = time.NewTimer(time.Hour) 2055 flushTimer = time.NewTimer(time.Hour) 2056 flushTimerCh <-chan time.Time 2057 instantTimerCh = make(chan time.Time) 2058 2059 w *pipelineWork 2060 err error 2061 ) 2062 close(instantTimerCh) 2063 for { 2064 againChW: 2065 select { 2066 case w = <-chW: 2067 // Fast path: len(chW) > 0 2068 default: 2069 // Slow path 2070 stopTimer.Reset(maxIdleConnDuration) 2071 select { 2072 case w = <-chW: 2073 case <-stopTimer.C: 2074 return nil 2075 case <-stopCh: 2076 return nil 2077 case <-flushTimerCh: 2078 if err = bw.Flush(); err != nil { 2079 return err 2080 } 2081 flushTimerCh = nil 2082 goto againChW 2083 } 2084 } 2085 2086 if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 { 2087 w.err = ErrTimeout 2088 w.done <- struct{}{} 2089 continue 2090 } 2091 2092 w.resp.parseNetConn(conn) 2093 2094 if writeTimeout > 0 { 2095 // Set Deadline every time, since golang has fixed the performance issue 2096 // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details 2097 currentTime := time.Now() 2098 if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil { 2099 w.err = err 2100 w.done <- struct{}{} 2101 return err 2102 } 2103 } 2104 if err = w.req.Write(bw); err != nil { 2105 w.err = err 2106 w.done <- struct{}{} 2107 return err 2108 } 2109 if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) { 2110 if maxBatchDelay > 0 { 2111 flushTimer.Reset(maxBatchDelay) 2112 flushTimerCh = flushTimer.C 2113 } else { 2114 flushTimerCh = instantTimerCh 2115 } 2116 } 2117 2118 againChR: 2119 select { 2120 case chR <- w: 2121 // Fast path: len(chR) < cap(chR) 2122 default: 2123 // Slow path 2124 select { 2125 case chR <- w: 2126 case <-stopCh: 2127 w.err = errPipelineConnStopped 2128 w.done <- struct{}{} 2129 return nil 2130 case <-flushTimerCh: 2131 if err = bw.Flush(); err != nil { 2132 w.err = err 2133 w.done <- struct{}{} 2134 return err 2135 } 2136 flushTimerCh = nil 2137 goto againChR 2138 } 2139 } 2140 } 2141} 2142 2143func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error { 2144 readBufferSize := c.ReadBufferSize 2145 if readBufferSize <= 0 { 2146 readBufferSize = defaultReadBufferSize 2147 } 2148 br := bufio.NewReaderSize(conn, readBufferSize) 2149 chR := c.chR 2150 readTimeout := c.ReadTimeout 2151 2152 var ( 2153 w *pipelineWork 2154 err error 2155 ) 2156 for { 2157 select { 2158 case w = <-chR: 2159 // Fast path: len(chR) > 0 2160 default: 2161 // Slow path 2162 select { 2163 case w = <-chR: 2164 case <-stopCh: 2165 return nil 2166 } 2167 } 2168 2169 if readTimeout > 0 { 2170 // Set Deadline every time, since golang has fixed the performance issue 2171 // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details 2172 currentTime := time.Now() 2173 if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil { 2174 w.err = err 2175 w.done <- struct{}{} 2176 return err 2177 } 2178 } 2179 if err = w.resp.Read(br); err != nil { 2180 w.err = err 2181 w.done <- struct{}{} 2182 return err 2183 } 2184 2185 w.done <- struct{}{} 2186 } 2187} 2188 2189func (c *pipelineConnClient) logger() Logger { 2190 if c.Logger != nil { 2191 return c.Logger 2192 } 2193 return defaultLogger 2194} 2195 2196// PendingRequests returns the current number of pending requests pipelined 2197// to the server. 2198// 2199// This number may exceed MaxPendingRequests*MaxConns by up to two times, since 2200// each connection to the server may keep up to MaxPendingRequests requests 2201// in the queue before sending them to the server. 2202// 2203// This function may be used for balancing load among multiple PipelineClient 2204// instances. 2205func (c *PipelineClient) PendingRequests() int { 2206 c.connClientsLock.Lock() 2207 n := 0 2208 for _, cc := range c.connClients { 2209 n += cc.PendingRequests() 2210 } 2211 c.connClientsLock.Unlock() 2212 return n 2213} 2214 2215func (c *pipelineConnClient) PendingRequests() int { 2216 c.init() 2217 2218 c.chLock.Lock() 2219 n := len(c.chR) + len(c.chW) 2220 c.chLock.Unlock() 2221 return n 2222} 2223 2224var errPipelineConnStopped = errors.New("pipeline connection has been stopped") 2225 2226func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) *pipelineWork { 2227 v := pool.Get() 2228 if v == nil { 2229 v = &pipelineWork{ 2230 done: make(chan struct{}, 1), 2231 } 2232 } 2233 w := v.(*pipelineWork) 2234 if timeout > 0 { 2235 if w.t == nil { 2236 w.t = time.NewTimer(timeout) 2237 } else { 2238 w.t.Reset(timeout) 2239 } 2240 w.deadline = time.Now().Add(timeout) 2241 } else { 2242 w.deadline = zeroTime 2243 } 2244 return w 2245} 2246 2247func releasePipelineWork(pool *sync.Pool, w *pipelineWork) { 2248 if w.t != nil { 2249 w.t.Stop() 2250 } 2251 w.reqCopy.Reset() 2252 w.respCopy.Reset() 2253 w.req = nil 2254 w.resp = nil 2255 w.err = nil 2256 pool.Put(w) 2257} 2258