1package fasthttp
2
3import (
4	"bufio"
5	"bytes"
6	"crypto/tls"
7	"errors"
8	"fmt"
9	"io"
10	"net"
11	"strings"
12	"sync"
13	"sync/atomic"
14	"time"
15)
16
17// Do performs the given http request and fills the given http response.
18//
19// Request must contain at least non-zero RequestURI with full url (including
20// scheme and host) or non-zero Host header + RequestURI.
21//
22// Client determines the server to be requested in the following order:
23//
24//   - from RequestURI if it contains full url with scheme and host;
25//   - from Host header otherwise.
26//
27// The function doesn't follow redirects. Use Get* for following redirects.
28//
29// Response is ignored if resp is nil.
30//
31// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
32// to the requested host are busy.
33//
34// It is recommended obtaining req and resp via AcquireRequest
35// and AcquireResponse in performance-critical code.
36func Do(req *Request, resp *Response) error {
37	return defaultClient.Do(req, resp)
38}
39
40// DoTimeout performs the given request and waits for response during
41// the given timeout duration.
42//
43// Request must contain at least non-zero RequestURI with full url (including
44// scheme and host) or non-zero Host header + RequestURI.
45//
46// Client determines the server to be requested in the following order:
47//
48//   - from RequestURI if it contains full url with scheme and host;
49//   - from Host header otherwise.
50//
51// The function doesn't follow redirects. Use Get* for following redirects.
52//
53// Response is ignored if resp is nil.
54//
55// ErrTimeout is returned if the response wasn't returned during
56// the given timeout.
57//
58// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
59// to the requested host are busy.
60//
61// It is recommended obtaining req and resp via AcquireRequest
62// and AcquireResponse in performance-critical code.
63//
64// Warning: DoTimeout does not terminate the request itself. The request will
65// continue in the background and the response will be discarded.
66// If requests take too long and the connection pool gets filled up please
67// try using a Client and setting a ReadTimeout.
68func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
69	return defaultClient.DoTimeout(req, resp, timeout)
70}
71
72// DoDeadline performs the given request and waits for response until
73// the given deadline.
74//
75// Request must contain at least non-zero RequestURI with full url (including
76// scheme and host) or non-zero Host header + RequestURI.
77//
78// Client determines the server to be requested in the following order:
79//
80//   - from RequestURI if it contains full url with scheme and host;
81//   - from Host header otherwise.
82//
83// The function doesn't follow redirects. Use Get* for following redirects.
84//
85// Response is ignored if resp is nil.
86//
87// ErrTimeout is returned if the response wasn't returned until
88// the given deadline.
89//
90// ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
91// to the requested host are busy.
92//
93// It is recommended obtaining req and resp via AcquireRequest
94// and AcquireResponse in performance-critical code.
95func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
96	return defaultClient.DoDeadline(req, resp, deadline)
97}
98
99// Get returns the status code and body of url.
100//
101// The contents of dst will be replaced by the body and returned, if the dst
102// is too small a new slice will be allocated.
103//
104// The function follows redirects. Use Do* for manually handling redirects.
105func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
106	return defaultClient.Get(dst, url)
107}
108
109// GetTimeout returns the status code and body of url.
110//
111// The contents of dst will be replaced by the body and returned, if the dst
112// is too small a new slice will be allocated.
113//
114// The function follows redirects. Use Do* for manually handling redirects.
115//
116// ErrTimeout error is returned if url contents couldn't be fetched
117// during the given timeout.
118func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
119	return defaultClient.GetTimeout(dst, url, timeout)
120}
121
122// GetDeadline returns the status code and body of url.
123//
124// The contents of dst will be replaced by the body and returned, if the dst
125// is too small a new slice will be allocated.
126//
127// The function follows redirects. Use Do* for manually handling redirects.
128//
129// ErrTimeout error is returned if url contents couldn't be fetched
130// until the given deadline.
131func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
132	return defaultClient.GetDeadline(dst, url, deadline)
133}
134
135// Post sends POST request to the given url with the given POST arguments.
136//
137// The contents of dst will be replaced by the body and returned, if the dst
138// is too small a new slice will be allocated.
139//
140// The function follows redirects. Use Do* for manually handling redirects.
141//
142// Empty POST body is sent if postArgs is nil.
143func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
144	return defaultClient.Post(dst, url, postArgs)
145}
146
147var defaultClient Client
148
149// Client implements http client.
150//
151// Copying Client by value is prohibited. Create new instance instead.
152//
153// It is safe calling Client methods from concurrently running goroutines.
154type Client struct {
155	noCopy noCopy
156
157	// Client name. Used in User-Agent request header.
158	//
159	// Default client name is used if not set.
160	Name string
161
162	// NoDefaultUserAgentHeader when set to true, causes the default
163	// User-Agent header to be excluded from the Request.
164	NoDefaultUserAgentHeader bool
165
166	// Callback for establishing new connections to hosts.
167	//
168	// Default Dial is used if not set.
169	Dial DialFunc
170
171	// Attempt to connect to both ipv4 and ipv6 addresses if set to true.
172	//
173	// This option is used only if default TCP dialer is used,
174	// i.e. if Dial is blank.
175	//
176	// By default client connects only to ipv4 addresses,
177	// since unfortunately ipv6 remains broken in many networks worldwide :)
178	DialDualStack bool
179
180	// TLS config for https connections.
181	//
182	// Default TLS config is used if not set.
183	TLSConfig *tls.Config
184
185	// Maximum number of connections per each host which may be established.
186	//
187	// DefaultMaxConnsPerHost is used if not set.
188	MaxConnsPerHost int
189
190	// Idle keep-alive connections are closed after this duration.
191	//
192	// By default idle connections are closed
193	// after DefaultMaxIdleConnDuration.
194	MaxIdleConnDuration time.Duration
195
196	// Maximum number of attempts for idempotent calls
197	//
198	// DefaultMaxIdemponentCallAttempts is used if not set.
199	MaxIdemponentCallAttempts int
200
201	// Per-connection buffer size for responses' reading.
202	// This also limits the maximum header size.
203	//
204	// Default buffer size is used if 0.
205	ReadBufferSize int
206
207	// Per-connection buffer size for requests' writing.
208	//
209	// Default buffer size is used if 0.
210	WriteBufferSize int
211
212	// Maximum duration for full response reading (including body).
213	//
214	// By default response read timeout is unlimited.
215	ReadTimeout time.Duration
216
217	// Maximum duration for full request writing (including body).
218	//
219	// By default request write timeout is unlimited.
220	WriteTimeout time.Duration
221
222	// Maximum response body size.
223	//
224	// The client returns ErrBodyTooLarge if this limit is greater than 0
225	// and response body is greater than the limit.
226	//
227	// By default response body size is unlimited.
228	MaxResponseBodySize int
229
230	// Header names are passed as-is without normalization
231	// if this option is set.
232	//
233	// Disabled header names' normalization may be useful only for proxying
234	// responses to other clients expecting case-sensitive
235	// header names. See https://github.com/valyala/fasthttp/issues/57
236	// for details.
237	//
238	// By default request and response header names are normalized, i.e.
239	// The first letter and the first letters following dashes
240	// are uppercased, while all the other letters are lowercased.
241	// Examples:
242	//
243	//     * HOST -> Host
244	//     * content-type -> Content-Type
245	//     * cONTENT-lenGTH -> Content-Length
246	DisableHeaderNamesNormalizing bool
247
248	mLock sync.Mutex
249	m     map[string]*HostClient
250	ms    map[string]*HostClient
251}
252
253// Get returns the status code and body of url.
254//
255// The contents of dst will be replaced by the body and returned, if the dst
256// is too small a new slice will be allocated.
257//
258// The function follows redirects. Use Do* for manually handling redirects.
259func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
260	return clientGetURL(dst, url, c)
261}
262
263// GetTimeout returns the status code and body of url.
264//
265// The contents of dst will be replaced by the body and returned, if the dst
266// is too small a new slice will be allocated.
267//
268// The function follows redirects. Use Do* for manually handling redirects.
269//
270// ErrTimeout error is returned if url contents couldn't be fetched
271// during the given timeout.
272func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
273	return clientGetURLTimeout(dst, url, timeout, c)
274}
275
276// GetDeadline returns the status code and body of url.
277//
278// The contents of dst will be replaced by the body and returned, if the dst
279// is too small a new slice will be allocated.
280//
281// The function follows redirects. Use Do* for manually handling redirects.
282//
283// ErrTimeout error is returned if url contents couldn't be fetched
284// until the given deadline.
285func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
286	return clientGetURLDeadline(dst, url, deadline, c)
287}
288
289// Post sends POST request to the given url with the given POST arguments.
290//
291// The contents of dst will be replaced by the body and returned, if the dst
292// is too small a new slice will be allocated.
293//
294// The function follows redirects. Use Do* for manually handling redirects.
295//
296// Empty POST body is sent if postArgs is nil.
297func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
298	return clientPostURL(dst, url, postArgs, c)
299}
300
301// DoTimeout performs the given request and waits for response during
302// the given timeout duration.
303//
304// Request must contain at least non-zero RequestURI with full url (including
305// scheme and host) or non-zero Host header + RequestURI.
306//
307// Client determines the server to be requested in the following order:
308//
309//   - from RequestURI if it contains full url with scheme and host;
310//   - from Host header otherwise.
311//
312// The function doesn't follow redirects. Use Get* for following redirects.
313//
314// Response is ignored if resp is nil.
315//
316// ErrTimeout is returned if the response wasn't returned during
317// the given timeout.
318//
319// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
320// to the requested host are busy.
321//
322// It is recommended obtaining req and resp via AcquireRequest
323// and AcquireResponse in performance-critical code.
324//
325// Warning: DoTimeout does not terminate the request itself. The request will
326// continue in the background and the response will be discarded.
327// If requests take too long and the connection pool gets filled up please
328// try setting a ReadTimeout.
329func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
330	return clientDoTimeout(req, resp, timeout, c)
331}
332
333// DoDeadline performs the given request and waits for response until
334// the given deadline.
335//
336// Request must contain at least non-zero RequestURI with full url (including
337// scheme and host) or non-zero Host header + RequestURI.
338//
339// Client determines the server to be requested in the following order:
340//
341//   - from RequestURI if it contains full url with scheme and host;
342//   - from Host header otherwise.
343//
344// The function doesn't follow redirects. Use Get* for following redirects.
345//
346// Response is ignored if resp is nil.
347//
348// ErrTimeout is returned if the response wasn't returned until
349// the given deadline.
350//
351// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
352// to the requested host are busy.
353//
354// It is recommended obtaining req and resp via AcquireRequest
355// and AcquireResponse in performance-critical code.
356func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
357	return clientDoDeadline(req, resp, deadline, c)
358}
359
360// Do performs the given http request and fills the given http response.
361//
362// Request must contain at least non-zero RequestURI with full url (including
363// scheme and host) or non-zero Host header + RequestURI.
364//
365// Client determines the server to be requested in the following order:
366//
367//   - from RequestURI if it contains full url with scheme and host;
368//   - from Host header otherwise.
369//
370// Response is ignored if resp is nil.
371//
372// The function doesn't follow redirects. Use Get* for following redirects.
373//
374// ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
375// to the requested host are busy.
376//
377// It is recommended obtaining req and resp via AcquireRequest
378// and AcquireResponse in performance-critical code.
379func (c *Client) Do(req *Request, resp *Response) error {
380	uri := req.URI()
381	host := uri.Host()
382
383	isTLS := false
384	scheme := uri.Scheme()
385	if bytes.Equal(scheme, strHTTPS) {
386		isTLS = true
387	} else if !bytes.Equal(scheme, strHTTP) {
388		return fmt.Errorf("unsupported protocol %q. http and https are supported", scheme)
389	}
390
391	startCleaner := false
392
393	c.mLock.Lock()
394	m := c.m
395	if isTLS {
396		m = c.ms
397	}
398	if m == nil {
399		m = make(map[string]*HostClient)
400		if isTLS {
401			c.ms = m
402		} else {
403			c.m = m
404		}
405	}
406	hc := m[string(host)]
407	if hc == nil {
408		hc = &HostClient{
409			Addr:                          addMissingPort(string(host), isTLS),
410			Name:                          c.Name,
411			NoDefaultUserAgentHeader:      c.NoDefaultUserAgentHeader,
412			Dial:                          c.Dial,
413			DialDualStack:                 c.DialDualStack,
414			IsTLS:                         isTLS,
415			TLSConfig:                     c.TLSConfig,
416			MaxConns:                      c.MaxConnsPerHost,
417			MaxIdleConnDuration:           c.MaxIdleConnDuration,
418			MaxIdemponentCallAttempts:     c.MaxIdemponentCallAttempts,
419			ReadBufferSize:                c.ReadBufferSize,
420			WriteBufferSize:               c.WriteBufferSize,
421			ReadTimeout:                   c.ReadTimeout,
422			WriteTimeout:                  c.WriteTimeout,
423			MaxResponseBodySize:           c.MaxResponseBodySize,
424			DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
425		}
426		m[string(host)] = hc
427		if len(m) == 1 {
428			startCleaner = true
429		}
430	}
431	c.mLock.Unlock()
432
433	if startCleaner {
434		go c.mCleaner(m)
435	}
436
437	return hc.Do(req, resp)
438}
439
440func (c *Client) mCleaner(m map[string]*HostClient) {
441	mustStop := false
442
443	for {
444		c.mLock.Lock()
445		for k, v := range m {
446			v.connsLock.Lock()
447			shouldRemove := v.connsCount == 0
448			v.connsLock.Unlock()
449
450			if shouldRemove {
451				delete(m, k)
452			}
453		}
454		if len(m) == 0 {
455			mustStop = true
456		}
457		c.mLock.Unlock()
458
459		if mustStop {
460			break
461		}
462		time.Sleep(10 * time.Second)
463	}
464}
465
466// DefaultMaxConnsPerHost is the maximum number of concurrent connections
467// http client may establish per host by default (i.e. if
468// Client.MaxConnsPerHost isn't set).
469const DefaultMaxConnsPerHost = 512
470
471// DefaultMaxIdleConnDuration is the default duration before idle keep-alive
472// connection is closed.
473const DefaultMaxIdleConnDuration = 10 * time.Second
474
475// DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
476const DefaultMaxIdemponentCallAttempts = 5
477
478// DialFunc must establish connection to addr.
479//
480// There is no need in establishing TLS (SSL) connection for https.
481// The client automatically converts connection to TLS
482// if HostClient.IsTLS is set.
483//
484// TCP address passed to DialFunc always contains host and port.
485// Example TCP addr values:
486//
487//   - foobar.com:80
488//   - foobar.com:443
489//   - foobar.com:8080
490type DialFunc func(addr string) (net.Conn, error)
491
492// HostClient balances http requests among hosts listed in Addr.
493//
494// HostClient may be used for balancing load among multiple upstream hosts.
495// While multiple addresses passed to HostClient.Addr may be used for balancing
496// load among them, it would be better using LBClient instead, since HostClient
497// may unevenly balance load among upstream hosts.
498//
499// It is forbidden copying HostClient instances. Create new instances instead.
500//
501// It is safe calling HostClient methods from concurrently running goroutines.
502type HostClient struct {
503	noCopy noCopy
504
505	// Comma-separated list of upstream HTTP server host addresses,
506	// which are passed to Dial in a round-robin manner.
507	//
508	// Each address may contain port if default dialer is used.
509	// For example,
510	//
511	//    - foobar.com:80
512	//    - foobar.com:443
513	//    - foobar.com:8080
514	Addr string
515
516	// Client name. Used in User-Agent request header.
517	Name string
518
519	// NoDefaultUserAgentHeader when set to true, causes the default
520	// User-Agent header to be excluded from the Request.
521	NoDefaultUserAgentHeader bool
522
523	// Callback for establishing new connection to the host.
524	//
525	// Default Dial is used if not set.
526	Dial DialFunc
527
528	// Attempt to connect to both ipv4 and ipv6 host addresses
529	// if set to true.
530	//
531	// This option is used only if default TCP dialer is used,
532	// i.e. if Dial is blank.
533	//
534	// By default client connects only to ipv4 addresses,
535	// since unfortunately ipv6 remains broken in many networks worldwide :)
536	DialDualStack bool
537
538	// Whether to use TLS (aka SSL or HTTPS) for host connections.
539	IsTLS bool
540
541	// Optional TLS config.
542	TLSConfig *tls.Config
543
544	// Maximum number of connections which may be established to all hosts
545	// listed in Addr.
546	//
547	// You can change this value while the HostClient is being used
548	// using HostClient.SetMaxConns(value)
549	//
550	// DefaultMaxConnsPerHost is used if not set.
551	MaxConns int
552
553	// Keep-alive connections are closed after this duration.
554	//
555	// By default connection duration is unlimited.
556	MaxConnDuration time.Duration
557
558	// Idle keep-alive connections are closed after this duration.
559	//
560	// By default idle connections are closed
561	// after DefaultMaxIdleConnDuration.
562	MaxIdleConnDuration time.Duration
563
564	// Maximum number of attempts for idempotent calls
565	//
566	// DefaultMaxIdemponentCallAttempts is used if not set.
567	MaxIdemponentCallAttempts int
568
569	// Per-connection buffer size for responses' reading.
570	// This also limits the maximum header size.
571	//
572	// Default buffer size is used if 0.
573	ReadBufferSize int
574
575	// Per-connection buffer size for requests' writing.
576	//
577	// Default buffer size is used if 0.
578	WriteBufferSize int
579
580	// Maximum duration for full response reading (including body).
581	//
582	// By default response read timeout is unlimited.
583	ReadTimeout time.Duration
584
585	// Maximum duration for full request writing (including body).
586	//
587	// By default request write timeout is unlimited.
588	WriteTimeout time.Duration
589
590	// Maximum response body size.
591	//
592	// The client returns ErrBodyTooLarge if this limit is greater than 0
593	// and response body is greater than the limit.
594	//
595	// By default response body size is unlimited.
596	MaxResponseBodySize int
597
598	// Header names are passed as-is without normalization
599	// if this option is set.
600	//
601	// Disabled header names' normalization may be useful only for proxying
602	// responses to other clients expecting case-sensitive
603	// header names. See https://github.com/valyala/fasthttp/issues/57
604	// for details.
605	//
606	// By default request and response header names are normalized, i.e.
607	// The first letter and the first letters following dashes
608	// are uppercased, while all the other letters are lowercased.
609	// Examples:
610	//
611	//     * HOST -> Host
612	//     * content-type -> Content-Type
613	//     * cONTENT-lenGTH -> Content-Length
614	DisableHeaderNamesNormalizing bool
615
616	clientName  atomic.Value
617	lastUseTime uint32
618
619	connsLock  sync.Mutex
620	connsCount int
621	conns      []*clientConn
622
623	addrsLock sync.Mutex
624	addrs     []string
625	addrIdx   uint32
626
627	tlsConfigMap     map[string]*tls.Config
628	tlsConfigMapLock sync.Mutex
629
630	readerPool sync.Pool
631	writerPool sync.Pool
632
633	pendingRequests int32
634
635	connsCleanerRun bool
636}
637
638type clientConn struct {
639	c net.Conn
640
641	createdTime time.Time
642	lastUseTime time.Time
643}
644
645var startTimeUnix = time.Now().Unix()
646
647// LastUseTime returns time the client was last used
648func (c *HostClient) LastUseTime() time.Time {
649	n := atomic.LoadUint32(&c.lastUseTime)
650	return time.Unix(startTimeUnix+int64(n), 0)
651}
652
653// Get returns the status code and body of url.
654//
655// The contents of dst will be replaced by the body and returned, if the dst
656// is too small a new slice will be allocated.
657//
658// The function follows redirects. Use Do* for manually handling redirects.
659func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
660	return clientGetURL(dst, url, c)
661}
662
663// GetTimeout returns the status code and body of url.
664//
665// The contents of dst will be replaced by the body and returned, if the dst
666// is too small a new slice will be allocated.
667//
668// The function follows redirects. Use Do* for manually handling redirects.
669//
670// ErrTimeout error is returned if url contents couldn't be fetched
671// during the given timeout.
672func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
673	return clientGetURLTimeout(dst, url, timeout, c)
674}
675
676// GetDeadline returns the status code and body of url.
677//
678// The contents of dst will be replaced by the body and returned, if the dst
679// is too small a new slice will be allocated.
680//
681// The function follows redirects. Use Do* for manually handling redirects.
682//
683// ErrTimeout error is returned if url contents couldn't be fetched
684// until the given deadline.
685func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
686	return clientGetURLDeadline(dst, url, deadline, c)
687}
688
689// Post sends POST request to the given url with the given POST arguments.
690//
691// The contents of dst will be replaced by the body and returned, if the dst
692// is too small a new slice will be allocated.
693//
694// The function follows redirects. Use Do* for manually handling redirects.
695//
696// Empty POST body is sent if postArgs is nil.
697func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
698	return clientPostURL(dst, url, postArgs, c)
699}
700
701type clientDoer interface {
702	Do(req *Request, resp *Response) error
703}
704
705func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
706	req := AcquireRequest()
707
708	statusCode, body, err = doRequestFollowRedirects(req, dst, url, c)
709
710	ReleaseRequest(req)
711	return statusCode, body, err
712}
713
714func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
715	deadline := time.Now().Add(timeout)
716	return clientGetURLDeadline(dst, url, deadline, c)
717}
718
719type clientURLResponse struct {
720	statusCode int
721	body       []byte
722	err        error
723}
724
725func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
726	timeout := -time.Since(deadline)
727	if timeout <= 0 {
728		return 0, dst, ErrTimeout
729	}
730
731	var ch chan clientURLResponse
732	chv := clientURLResponseChPool.Get()
733	if chv == nil {
734		chv = make(chan clientURLResponse, 1)
735	}
736	ch = chv.(chan clientURLResponse)
737
738	req := AcquireRequest()
739
740	// Note that the request continues execution on ErrTimeout until
741	// client-specific ReadTimeout exceeds. This helps limiting load
742	// on slow hosts by MaxConns* concurrent requests.
743	//
744	// Without this 'hack' the load on slow host could exceed MaxConns*
745	// concurrent requests, since timed out requests on client side
746	// usually continue execution on the host.
747	go func() {
748		statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirects(req, dst, url, c)
749		ch <- clientURLResponse{
750			statusCode: statusCodeCopy,
751			body:       bodyCopy,
752			err:        errCopy,
753		}
754	}()
755
756	tc := AcquireTimer(timeout)
757	select {
758	case resp := <-ch:
759		ReleaseRequest(req)
760		clientURLResponseChPool.Put(chv)
761		statusCode = resp.statusCode
762		body = resp.body
763		err = resp.err
764	case <-tc.C:
765		body = dst
766		err = ErrTimeout
767	}
768	ReleaseTimer(tc)
769
770	return statusCode, body, err
771}
772
773var clientURLResponseChPool sync.Pool
774
775func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
776	req := AcquireRequest()
777	req.Header.SetMethodBytes(strPost)
778	req.Header.SetContentTypeBytes(strPostArgsContentType)
779	if postArgs != nil {
780		postArgs.WriteTo(req.BodyWriter())
781	}
782
783	statusCode, body, err = doRequestFollowRedirects(req, dst, url, c)
784
785	ReleaseRequest(req)
786	return statusCode, body, err
787}
788
789var (
790	errMissingLocation  = errors.New("missing Location header for http redirect")
791	errTooManyRedirects = errors.New("too many redirects detected when doing the request")
792)
793
794const maxRedirectsCount = 16
795
796func doRequestFollowRedirects(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
797	resp := AcquireResponse()
798	bodyBuf := resp.bodyBuffer()
799	resp.keepBodyBuffer = true
800	oldBody := bodyBuf.B
801	bodyBuf.B = dst
802	scheme := req.uri.Scheme()
803	req.schemaUpdate = false
804
805	redirectsCount := 0
806	for {
807		// In case redirect to different scheme
808		if redirectsCount > 0 && !bytes.Equal(scheme, req.uri.Scheme()) {
809			if strings.HasPrefix(url, string(strHTTPS)) {
810				req.isTLS = true
811				req.uri.SetSchemeBytes(strHTTPS)
812			} else {
813				req.isTLS = false
814				req.uri.SetSchemeBytes(strHTTP)
815			}
816			scheme = req.uri.Scheme()
817			req.schemaUpdate = true
818		}
819
820		req.parsedURI = false
821		req.Header.host = req.Header.host[:0]
822		req.SetRequestURI(url)
823
824		if err = c.Do(req, resp); err != nil {
825			break
826		}
827		statusCode = resp.Header.StatusCode()
828		if statusCode != StatusMovedPermanently &&
829			statusCode != StatusFound &&
830			statusCode != StatusSeeOther &&
831			statusCode != StatusTemporaryRedirect &&
832			statusCode != StatusPermanentRedirect {
833			break
834		}
835
836		redirectsCount++
837		if redirectsCount > maxRedirectsCount {
838			err = errTooManyRedirects
839			break
840		}
841		location := resp.Header.peek(strLocation)
842		if len(location) == 0 {
843			err = errMissingLocation
844			break
845		}
846		url = getRedirectURL(url, location)
847	}
848
849	body = bodyBuf.B
850	bodyBuf.B = oldBody
851	resp.keepBodyBuffer = false
852	ReleaseResponse(resp)
853
854	return statusCode, body, err
855}
856
857func getRedirectURL(baseURL string, location []byte) string {
858	u := AcquireURI()
859	u.Update(baseURL)
860	u.UpdateBytes(location)
861	redirectURL := u.String()
862	ReleaseURI(u)
863	return redirectURL
864}
865
866var (
867	requestPool  sync.Pool
868	responsePool sync.Pool
869)
870
871// AcquireRequest returns an empty Request instance from request pool.
872//
873// The returned Request instance may be passed to ReleaseRequest when it is
874// no longer needed. This allows Request recycling, reduces GC pressure
875// and usually improves performance.
876func AcquireRequest() *Request {
877	v := requestPool.Get()
878	if v == nil {
879		return &Request{}
880	}
881	return v.(*Request)
882}
883
884// ReleaseRequest returns req acquired via AcquireRequest to request pool.
885//
886// It is forbidden accessing req and/or its' members after returning
887// it to request pool.
888func ReleaseRequest(req *Request) {
889	req.Reset()
890	requestPool.Put(req)
891}
892
893// AcquireResponse returns an empty Response instance from response pool.
894//
895// The returned Response instance may be passed to ReleaseResponse when it is
896// no longer needed. This allows Response recycling, reduces GC pressure
897// and usually improves performance.
898func AcquireResponse() *Response {
899	v := responsePool.Get()
900	if v == nil {
901		return &Response{}
902	}
903	return v.(*Response)
904}
905
906// ReleaseResponse return resp acquired via AcquireResponse to response pool.
907//
908// It is forbidden accessing resp and/or its' members after returning
909// it to response pool.
910func ReleaseResponse(resp *Response) {
911	resp.Reset()
912	responsePool.Put(resp)
913}
914
915// DoTimeout performs the given request and waits for response during
916// the given timeout duration.
917//
918// Request must contain at least non-zero RequestURI with full url (including
919// scheme and host) or non-zero Host header + RequestURI.
920//
921// The function doesn't follow redirects. Use Get* for following redirects.
922//
923// Response is ignored if resp is nil.
924//
925// ErrTimeout is returned if the response wasn't returned during
926// the given timeout.
927//
928// ErrNoFreeConns is returned if all HostClient.MaxConns connections
929// to the host are busy.
930//
931// It is recommended obtaining req and resp via AcquireRequest
932// and AcquireResponse in performance-critical code.
933//
934// Warning: DoTimeout does not terminate the request itself. The request will
935// continue in the background and the response will be discarded.
936// If requests take too long and the connection pool gets filled up please
937// try setting a ReadTimeout.
938func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
939	return clientDoTimeout(req, resp, timeout, c)
940}
941
942// DoDeadline performs the given request and waits for response until
943// the given deadline.
944//
945// Request must contain at least non-zero RequestURI with full url (including
946// scheme and host) or non-zero Host header + RequestURI.
947//
948// The function doesn't follow redirects. Use Get* for following redirects.
949//
950// Response is ignored if resp is nil.
951//
952// ErrTimeout is returned if the response wasn't returned until
953// the given deadline.
954//
955// ErrNoFreeConns is returned if all HostClient.MaxConns connections
956// to the host are busy.
957//
958// It is recommended obtaining req and resp via AcquireRequest
959// and AcquireResponse in performance-critical code.
960func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
961	return clientDoDeadline(req, resp, deadline, c)
962}
963
964func clientDoTimeout(req *Request, resp *Response, timeout time.Duration, c clientDoer) error {
965	deadline := time.Now().Add(timeout)
966	return clientDoDeadline(req, resp, deadline, c)
967}
968
969func clientDoDeadline(req *Request, resp *Response, deadline time.Time, c clientDoer) error {
970	timeout := -time.Since(deadline)
971	if timeout <= 0 {
972		return ErrTimeout
973	}
974
975	var ch chan error
976	chv := errorChPool.Get()
977	if chv == nil {
978		chv = make(chan error, 1)
979	}
980	ch = chv.(chan error)
981
982	// Make req and resp copies, since on timeout they no longer
983	// may be accessed.
984	reqCopy := AcquireRequest()
985	req.copyToSkipBody(reqCopy)
986	swapRequestBody(req, reqCopy)
987	respCopy := AcquireResponse()
988	// Not calling resp.copyToSkipBody(respCopy) here to avoid
989	// unexpected messing with headers
990	respCopy.SkipBody = resp.SkipBody
991
992	// Note that the request continues execution on ErrTimeout until
993	// client-specific ReadTimeout exceeds. This helps limiting load
994	// on slow hosts by MaxConns* concurrent requests.
995	//
996	// Without this 'hack' the load on slow host could exceed MaxConns*
997	// concurrent requests, since timed out requests on client side
998	// usually continue execution on the host.
999
1000	var cleanup int32
1001	go func() {
1002		errDo := c.Do(reqCopy, respCopy)
1003		if atomic.LoadInt32(&cleanup) == 1 {
1004			ReleaseResponse(respCopy)
1005			ReleaseRequest(reqCopy)
1006			errorChPool.Put(chv)
1007		} else {
1008			ch <- errDo
1009		}
1010	}()
1011
1012	tc := AcquireTimer(timeout)
1013	var err error
1014	select {
1015	case err = <-ch:
1016		if resp != nil {
1017			respCopy.copyToSkipBody(resp)
1018			swapResponseBody(resp, respCopy)
1019		}
1020		swapRequestBody(reqCopy, req)
1021		ReleaseResponse(respCopy)
1022		ReleaseRequest(reqCopy)
1023		errorChPool.Put(chv)
1024	case <-tc.C:
1025		atomic.StoreInt32(&cleanup, 1)
1026		err = ErrTimeout
1027	}
1028	ReleaseTimer(tc)
1029
1030	return err
1031}
1032
1033var errorChPool sync.Pool
1034
1035// Do performs the given http request and sets the corresponding response.
1036//
1037// Request must contain at least non-zero RequestURI with full url (including
1038// scheme and host) or non-zero Host header + RequestURI.
1039//
1040// The function doesn't follow redirects. Use Get* for following redirects.
1041//
1042// Response is ignored if resp is nil.
1043//
1044// ErrNoFreeConns is returned if all HostClient.MaxConns connections
1045// to the host are busy.
1046//
1047// It is recommended obtaining req and resp via AcquireRequest
1048// and AcquireResponse in performance-critical code.
1049func (c *HostClient) Do(req *Request, resp *Response) error {
1050	var err error
1051	var retry bool
1052	maxAttempts := c.MaxIdemponentCallAttempts
1053	if maxAttempts <= 0 {
1054		maxAttempts = DefaultMaxIdemponentCallAttempts
1055	}
1056	attempts := 0
1057
1058	atomic.AddInt32(&c.pendingRequests, 1)
1059	for {
1060		retry, err = c.do(req, resp)
1061		if err == nil || !retry {
1062			break
1063		}
1064
1065		if !isIdempotent(req) {
1066			// Retry non-idempotent requests if the server closes
1067			// the connection before sending the response.
1068			//
1069			// This case is possible if the server closes the idle
1070			// keep-alive connection on timeout.
1071			//
1072			// Apache and nginx usually do this.
1073			if err != io.EOF {
1074				break
1075			}
1076		}
1077		attempts++
1078		if attempts >= maxAttempts {
1079			break
1080		}
1081	}
1082	atomic.AddInt32(&c.pendingRequests, -1)
1083
1084	if err == io.EOF {
1085		err = ErrConnectionClosed
1086	}
1087	return err
1088}
1089
1090// PendingRequests returns the current number of requests the client
1091// is executing.
1092//
1093// This function may be used for balancing load among multiple HostClient
1094// instances.
1095func (c *HostClient) PendingRequests() int {
1096	return int(atomic.LoadInt32(&c.pendingRequests))
1097}
1098
1099func isIdempotent(req *Request) bool {
1100	return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
1101}
1102
1103func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
1104	nilResp := false
1105	if resp == nil {
1106		nilResp = true
1107		resp = AcquireResponse()
1108	}
1109
1110	ok, err := c.doNonNilReqResp(req, resp)
1111
1112	if nilResp {
1113		ReleaseResponse(resp)
1114	}
1115
1116	return ok, err
1117}
1118
1119func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
1120	if req == nil {
1121		panic("BUG: req cannot be nil")
1122	}
1123	if resp == nil {
1124		panic("BUG: resp cannot be nil")
1125	}
1126
1127	atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
1128
1129	// Free up resources occupied by response before sending the request,
1130	// so the GC may reclaim these resources (e.g. response body).
1131	resp.Reset()
1132
1133	// If we detected a redirect to another schema
1134	if req.schemaUpdate {
1135		c.IsTLS = bytes.Equal(req.URI().Scheme(), strHTTPS)
1136		c.Addr = addMissingPort(string(req.Host()), c.IsTLS)
1137		c.addrIdx = 0
1138		c.addrs = nil
1139		req.schemaUpdate = false
1140		req.SetConnectionClose()
1141	}
1142
1143	cc, err := c.acquireConn()
1144	if err != nil {
1145		return false, err
1146	}
1147	conn := cc.c
1148
1149	resp.parseNetConn(conn)
1150
1151	if c.WriteTimeout > 0 {
1152		// Set Deadline every time, since golang has fixed the performance issue
1153		// See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
1154		currentTime := time.Now()
1155		if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil {
1156			c.closeConn(cc)
1157			return true, err
1158		}
1159	}
1160
1161	resetConnection := false
1162	if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() {
1163		req.SetConnectionClose()
1164		resetConnection = true
1165	}
1166
1167	userAgentOld := req.Header.UserAgent()
1168	if len(userAgentOld) == 0 {
1169		req.Header.userAgent = c.getClientName()
1170	}
1171	bw := c.acquireWriter(conn)
1172	err = req.Write(bw)
1173
1174	if resetConnection {
1175		req.Header.ResetConnectionClose()
1176	}
1177
1178	if err == nil {
1179		err = bw.Flush()
1180	}
1181	if err != nil {
1182		c.releaseWriter(bw)
1183		c.closeConn(cc)
1184		return true, err
1185	}
1186	c.releaseWriter(bw)
1187
1188	if c.ReadTimeout > 0 {
1189		// Set Deadline every time, since golang has fixed the performance issue
1190		// See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
1191		currentTime := time.Now()
1192		if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil {
1193			c.closeConn(cc)
1194			return true, err
1195		}
1196	}
1197
1198	if !req.Header.IsGet() && req.Header.IsHead() {
1199		resp.SkipBody = true
1200	}
1201	if c.DisableHeaderNamesNormalizing {
1202		resp.Header.DisableNormalizing()
1203	}
1204
1205	br := c.acquireReader(conn)
1206	if err = resp.ReadLimitBody(br, c.MaxResponseBodySize); err != nil {
1207		c.releaseReader(br)
1208		c.closeConn(cc)
1209		// Don't retry in case of ErrBodyTooLarge since we will just get the same again.
1210		retry := err != ErrBodyTooLarge
1211		return retry, err
1212	}
1213	c.releaseReader(br)
1214
1215	if resetConnection || req.ConnectionClose() || resp.ConnectionClose() {
1216		c.closeConn(cc)
1217	} else {
1218		c.releaseConn(cc)
1219	}
1220
1221	return false, err
1222}
1223
1224var (
1225	// ErrNoFreeConns is returned when no free connections available
1226	// to the given host.
1227	//
1228	// Increase the allowed number of connections per host if you
1229	// see this error.
1230	ErrNoFreeConns = errors.New("no free connections available to host")
1231
1232	// ErrTimeout is returned from timed out calls.
1233	ErrTimeout = errors.New("timeout")
1234
1235	// ErrConnectionClosed may be returned from client methods if the server
1236	// closes connection before returning the first response byte.
1237	//
1238	// If you see this error, then either fix the server by returning
1239	// 'Connection: close' response header before closing the connection
1240	// or add 'Connection: close' request header before sending requests
1241	// to broken server.
1242	ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
1243		"Make sure the server returns 'Connection: close' response header before closing the connection")
1244)
1245
1246func (c *HostClient) SetMaxConns(newMaxConns int) {
1247	c.connsLock.Lock()
1248	c.MaxConns = newMaxConns
1249	c.connsLock.Unlock()
1250}
1251
1252func (c *HostClient) acquireConn() (*clientConn, error) {
1253	var cc *clientConn
1254	createConn := false
1255	startCleaner := false
1256
1257	var n int
1258	c.connsLock.Lock()
1259	n = len(c.conns)
1260	if n == 0 {
1261		maxConns := c.MaxConns
1262		if maxConns <= 0 {
1263			maxConns = DefaultMaxConnsPerHost
1264		}
1265		if c.connsCount < maxConns {
1266			c.connsCount++
1267			createConn = true
1268			if !c.connsCleanerRun {
1269				startCleaner = true
1270				c.connsCleanerRun = true
1271			}
1272		}
1273	} else {
1274		n--
1275		cc = c.conns[n]
1276		c.conns[n] = nil
1277		c.conns = c.conns[:n]
1278	}
1279	c.connsLock.Unlock()
1280
1281	if cc != nil {
1282		return cc, nil
1283	}
1284	if !createConn {
1285		return nil, ErrNoFreeConns
1286	}
1287
1288	if startCleaner {
1289		go c.connsCleaner()
1290	}
1291
1292	conn, err := c.dialHostHard()
1293	if err != nil {
1294		c.decConnsCount()
1295		return nil, err
1296	}
1297	cc = acquireClientConn(conn)
1298
1299	return cc, nil
1300}
1301
1302func (c *HostClient) connsCleaner() {
1303	var (
1304		scratch             []*clientConn
1305		maxIdleConnDuration = c.MaxIdleConnDuration
1306	)
1307	if maxIdleConnDuration <= 0 {
1308		maxIdleConnDuration = DefaultMaxIdleConnDuration
1309	}
1310	for {
1311		currentTime := time.Now()
1312
1313		// Determine idle connections to be closed.
1314		c.connsLock.Lock()
1315		conns := c.conns
1316		n := len(conns)
1317		i := 0
1318		for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
1319			i++
1320		}
1321		sleepFor := maxIdleConnDuration
1322		if i < n {
1323			// + 1 so we actually sleep past the expiration time and not up to it.
1324			// Otherwise the > check above would still fail.
1325			sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
1326		}
1327		scratch = append(scratch[:0], conns[:i]...)
1328		if i > 0 {
1329			m := copy(conns, conns[i:])
1330			for i = m; i < n; i++ {
1331				conns[i] = nil
1332			}
1333			c.conns = conns[:m]
1334		}
1335		c.connsLock.Unlock()
1336
1337		// Close idle connections.
1338		for i, cc := range scratch {
1339			c.closeConn(cc)
1340			scratch[i] = nil
1341		}
1342
1343		// Determine whether to stop the connsCleaner.
1344		c.connsLock.Lock()
1345		mustStop := c.connsCount == 0
1346		if mustStop {
1347			c.connsCleanerRun = false
1348		}
1349		c.connsLock.Unlock()
1350		if mustStop {
1351			break
1352		}
1353
1354		time.Sleep(sleepFor)
1355	}
1356}
1357
1358func (c *HostClient) closeConn(cc *clientConn) {
1359	c.decConnsCount()
1360	cc.c.Close()
1361	releaseClientConn(cc)
1362}
1363
1364func (c *HostClient) decConnsCount() {
1365	c.connsLock.Lock()
1366	c.connsCount--
1367	c.connsLock.Unlock()
1368}
1369
1370func acquireClientConn(conn net.Conn) *clientConn {
1371	v := clientConnPool.Get()
1372	if v == nil {
1373		v = &clientConn{}
1374	}
1375	cc := v.(*clientConn)
1376	cc.c = conn
1377	cc.createdTime = time.Now()
1378	return cc
1379}
1380
1381func releaseClientConn(cc *clientConn) {
1382	// Reset all fields.
1383	*cc = clientConn{}
1384	clientConnPool.Put(cc)
1385}
1386
1387var clientConnPool sync.Pool
1388
1389func (c *HostClient) releaseConn(cc *clientConn) {
1390	cc.lastUseTime = time.Now()
1391	c.connsLock.Lock()
1392	c.conns = append(c.conns, cc)
1393	c.connsLock.Unlock()
1394}
1395
1396func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
1397	v := c.writerPool.Get()
1398	if v == nil {
1399		n := c.WriteBufferSize
1400		if n <= 0 {
1401			n = defaultWriteBufferSize
1402		}
1403		return bufio.NewWriterSize(conn, n)
1404	}
1405	bw := v.(*bufio.Writer)
1406	bw.Reset(conn)
1407	return bw
1408}
1409
1410func (c *HostClient) releaseWriter(bw *bufio.Writer) {
1411	c.writerPool.Put(bw)
1412}
1413
1414func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
1415	v := c.readerPool.Get()
1416	if v == nil {
1417		n := c.ReadBufferSize
1418		if n <= 0 {
1419			n = defaultReadBufferSize
1420		}
1421		return bufio.NewReaderSize(conn, n)
1422	}
1423	br := v.(*bufio.Reader)
1424	br.Reset(conn)
1425	return br
1426}
1427
1428func (c *HostClient) releaseReader(br *bufio.Reader) {
1429	c.readerPool.Put(br)
1430}
1431
1432func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
1433	if c == nil {
1434		c = &tls.Config{}
1435	} else {
1436		// TODO: substitute this with c.Clone() after go1.8 becomes mainstream :)
1437		c = &tls.Config{
1438			Rand:              c.Rand,
1439			Time:              c.Time,
1440			Certificates:      c.Certificates,
1441			NameToCertificate: c.NameToCertificate,
1442			GetCertificate:    c.GetCertificate,
1443			RootCAs:           c.RootCAs,
1444			NextProtos:        c.NextProtos,
1445			ServerName:        c.ServerName,
1446
1447			// Do not copy ClientAuth, since it is server-related stuff
1448			// Do not copy ClientCAs, since it is server-related stuff
1449
1450			InsecureSkipVerify: c.InsecureSkipVerify,
1451			CipherSuites:       c.CipherSuites,
1452
1453			// Do not copy PreferServerCipherSuites - this is server stuff
1454
1455			SessionTicketsDisabled: c.SessionTicketsDisabled,
1456
1457			// Do not copy SessionTicketKey - this is server stuff
1458
1459			ClientSessionCache: c.ClientSessionCache,
1460			MinVersion:         c.MinVersion,
1461			MaxVersion:         c.MaxVersion,
1462			CurvePreferences:   c.CurvePreferences,
1463		}
1464	}
1465
1466	if c.ClientSessionCache == nil {
1467		c.ClientSessionCache = tls.NewLRUClientSessionCache(0)
1468	}
1469
1470	if len(c.ServerName) == 0 {
1471		serverName := tlsServerName(addr)
1472		if serverName == "*" {
1473			c.InsecureSkipVerify = true
1474		} else {
1475			c.ServerName = serverName
1476		}
1477	}
1478	return c
1479}
1480
1481func tlsServerName(addr string) string {
1482	if !strings.Contains(addr, ":") {
1483		return addr
1484	}
1485	host, _, err := net.SplitHostPort(addr)
1486	if err != nil {
1487		return "*"
1488	}
1489	return host
1490}
1491
1492func (c *HostClient) nextAddr() string {
1493	c.addrsLock.Lock()
1494	if c.addrs == nil {
1495		c.addrs = strings.Split(c.Addr, ",")
1496	}
1497	addr := c.addrs[0]
1498	if len(c.addrs) > 1 {
1499		addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
1500		c.addrIdx++
1501	}
1502	c.addrsLock.Unlock()
1503	return addr
1504}
1505
1506func (c *HostClient) dialHostHard() (conn net.Conn, err error) {
1507	// attempt to dial all the available hosts before giving up.
1508
1509	c.addrsLock.Lock()
1510	n := len(c.addrs)
1511	c.addrsLock.Unlock()
1512
1513	if n == 0 {
1514		// It looks like c.addrs isn't initialized yet.
1515		n = 1
1516	}
1517
1518	timeout := c.ReadTimeout + c.WriteTimeout
1519	if timeout <= 0 {
1520		timeout = DefaultDialTimeout
1521	}
1522	deadline := time.Now().Add(timeout)
1523	for n > 0 {
1524		addr := c.nextAddr()
1525		tlsConfig := c.cachedTLSConfig(addr)
1526		conn, err = dialAddr(addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig)
1527		if err == nil {
1528			return conn, nil
1529		}
1530		if time.Since(deadline) >= 0 {
1531			break
1532		}
1533		n--
1534	}
1535	return nil, err
1536}
1537
1538func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
1539	if !c.IsTLS {
1540		return nil
1541	}
1542
1543	c.tlsConfigMapLock.Lock()
1544	if c.tlsConfigMap == nil {
1545		c.tlsConfigMap = make(map[string]*tls.Config)
1546	}
1547	cfg := c.tlsConfigMap[addr]
1548	if cfg == nil {
1549		cfg = newClientTLSConfig(c.TLSConfig, addr)
1550		c.tlsConfigMap[addr] = cfg
1551	}
1552	c.tlsConfigMapLock.Unlock()
1553
1554	return cfg
1555}
1556
1557func dialAddr(addr string, dial DialFunc, dialDualStack, isTLS bool, tlsConfig *tls.Config) (net.Conn, error) {
1558	if dial == nil {
1559		if dialDualStack {
1560			dial = DialDualStack
1561		} else {
1562			dial = Dial
1563		}
1564		addr = addMissingPort(addr, isTLS)
1565	}
1566	conn, err := dial(addr)
1567	if err != nil {
1568		return nil, err
1569	}
1570	if conn == nil {
1571		panic("BUG: DialFunc returned (nil, nil)")
1572	}
1573	if isTLS {
1574		conn = tls.Client(conn, tlsConfig)
1575	}
1576	return conn, nil
1577}
1578
1579func (c *HostClient) getClientName() []byte {
1580	v := c.clientName.Load()
1581	var clientName []byte
1582	if v == nil {
1583		clientName = []byte(c.Name)
1584		if len(clientName) == 0 && !c.NoDefaultUserAgentHeader {
1585			clientName = defaultUserAgent
1586		}
1587		c.clientName.Store(clientName)
1588	} else {
1589		clientName = v.([]byte)
1590	}
1591	return clientName
1592}
1593
1594func addMissingPort(addr string, isTLS bool) string {
1595	n := strings.Index(addr, ":")
1596	if n >= 0 {
1597		return addr
1598	}
1599	port := 80
1600	if isTLS {
1601		port = 443
1602	}
1603	return fmt.Sprintf("%s:%d", addr, port)
1604}
1605
1606// PipelineClient pipelines requests over a limited set of concurrent
1607// connections to the given Addr.
1608//
1609// This client may be used in highly loaded HTTP-based RPC systems for reducing
1610// context switches and network level overhead.
1611// See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
1612//
1613// It is forbidden copying PipelineClient instances. Create new instances
1614// instead.
1615//
1616// It is safe calling PipelineClient methods from concurrently running
1617// goroutines.
1618type PipelineClient struct {
1619	noCopy noCopy
1620
1621	// Address of the host to connect to.
1622	Addr string
1623
1624	// The maximum number of concurrent connections to the Addr.
1625	//
1626	// A single connection is used by default.
1627	MaxConns int
1628
1629	// The maximum number of pending pipelined requests over
1630	// a single connection to Addr.
1631	//
1632	// DefaultMaxPendingRequests is used by default.
1633	MaxPendingRequests int
1634
1635	// The maximum delay before sending pipelined requests as a batch
1636	// to the server.
1637	//
1638	// By default requests are sent immediately to the server.
1639	MaxBatchDelay time.Duration
1640
1641	// Callback for connection establishing to the host.
1642	//
1643	// Default Dial is used if not set.
1644	Dial DialFunc
1645
1646	// Attempt to connect to both ipv4 and ipv6 host addresses
1647	// if set to true.
1648	//
1649	// This option is used only if default TCP dialer is used,
1650	// i.e. if Dial is blank.
1651	//
1652	// By default client connects only to ipv4 addresses,
1653	// since unfortunately ipv6 remains broken in many networks worldwide :)
1654	DialDualStack bool
1655
1656	// Whether to use TLS (aka SSL or HTTPS) for host connections.
1657	IsTLS bool
1658
1659	// Optional TLS config.
1660	TLSConfig *tls.Config
1661
1662	// Idle connection to the host is closed after this duration.
1663	//
1664	// By default idle connection is closed after
1665	// DefaultMaxIdleConnDuration.
1666	MaxIdleConnDuration time.Duration
1667
1668	// Buffer size for responses' reading.
1669	// This also limits the maximum header size.
1670	//
1671	// Default buffer size is used if 0.
1672	ReadBufferSize int
1673
1674	// Buffer size for requests' writing.
1675	//
1676	// Default buffer size is used if 0.
1677	WriteBufferSize int
1678
1679	// Maximum duration for full response reading (including body).
1680	//
1681	// By default response read timeout is unlimited.
1682	ReadTimeout time.Duration
1683
1684	// Maximum duration for full request writing (including body).
1685	//
1686	// By default request write timeout is unlimited.
1687	WriteTimeout time.Duration
1688
1689	// Logger for logging client errors.
1690	//
1691	// By default standard logger from log package is used.
1692	Logger Logger
1693
1694	connClients     []*pipelineConnClient
1695	connClientsLock sync.Mutex
1696}
1697
1698type pipelineConnClient struct {
1699	noCopy noCopy
1700
1701	Addr                string
1702	MaxPendingRequests  int
1703	MaxBatchDelay       time.Duration
1704	Dial                DialFunc
1705	DialDualStack       bool
1706	IsTLS               bool
1707	TLSConfig           *tls.Config
1708	MaxIdleConnDuration time.Duration
1709	ReadBufferSize      int
1710	WriteBufferSize     int
1711	ReadTimeout         time.Duration
1712	WriteTimeout        time.Duration
1713	Logger              Logger
1714
1715	workPool sync.Pool
1716
1717	chLock sync.Mutex
1718	chW    chan *pipelineWork
1719	chR    chan *pipelineWork
1720
1721	tlsConfigLock sync.Mutex
1722	tlsConfig     *tls.Config
1723}
1724
1725type pipelineWork struct {
1726	reqCopy  Request
1727	respCopy Response
1728	req      *Request
1729	resp     *Response
1730	t        *time.Timer
1731	deadline time.Time
1732	err      error
1733	done     chan struct{}
1734}
1735
1736// DoTimeout performs the given request and waits for response during
1737// the given timeout duration.
1738//
1739// Request must contain at least non-zero RequestURI with full url (including
1740// scheme and host) or non-zero Host header + RequestURI.
1741//
1742// The function doesn't follow redirects.
1743//
1744// Response is ignored if resp is nil.
1745//
1746// ErrTimeout is returned if the response wasn't returned during
1747// the given timeout.
1748//
1749// It is recommended obtaining req and resp via AcquireRequest
1750// and AcquireResponse in performance-critical code.
1751//
1752// Warning: DoTimeout does not terminate the request itself. The request will
1753// continue in the background and the response will be discarded.
1754// If requests take too long and the connection pool gets filled up please
1755// try setting a ReadTimeout.
1756func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
1757	return c.DoDeadline(req, resp, time.Now().Add(timeout))
1758}
1759
1760// DoDeadline performs the given request and waits for response until
1761// the given deadline.
1762//
1763// Request must contain at least non-zero RequestURI with full url (including
1764// scheme and host) or non-zero Host header + RequestURI.
1765//
1766// The function doesn't follow redirects.
1767//
1768// Response is ignored if resp is nil.
1769//
1770// ErrTimeout is returned if the response wasn't returned until
1771// the given deadline.
1772//
1773// It is recommended obtaining req and resp via AcquireRequest
1774// and AcquireResponse in performance-critical code.
1775func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
1776	return c.getConnClient().DoDeadline(req, resp, deadline)
1777}
1778
1779func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
1780	c.init()
1781
1782	timeout := -time.Since(deadline)
1783	if timeout < 0 {
1784		return ErrTimeout
1785	}
1786
1787	w := acquirePipelineWork(&c.workPool, timeout)
1788	w.req = &w.reqCopy
1789	w.resp = &w.respCopy
1790
1791	// Make a copy of the request in order to avoid data races on timeouts
1792	req.copyToSkipBody(&w.reqCopy)
1793	swapRequestBody(req, &w.reqCopy)
1794
1795	// Put the request to outgoing queue
1796	select {
1797	case c.chW <- w:
1798		// Fast path: len(c.ch) < cap(c.ch)
1799	default:
1800		// Slow path
1801		select {
1802		case c.chW <- w:
1803		case <-w.t.C:
1804			releasePipelineWork(&c.workPool, w)
1805			return ErrTimeout
1806		}
1807	}
1808
1809	// Wait for the response
1810	var err error
1811	select {
1812	case <-w.done:
1813		if resp != nil {
1814			w.respCopy.copyToSkipBody(resp)
1815			swapResponseBody(resp, &w.respCopy)
1816		}
1817		err = w.err
1818		releasePipelineWork(&c.workPool, w)
1819	case <-w.t.C:
1820		err = ErrTimeout
1821	}
1822
1823	return err
1824}
1825
1826// Do performs the given http request and sets the corresponding response.
1827//
1828// Request must contain at least non-zero RequestURI with full url (including
1829// scheme and host) or non-zero Host header + RequestURI.
1830//
1831// The function doesn't follow redirects. Use Get* for following redirects.
1832//
1833// Response is ignored if resp is nil.
1834//
1835// It is recommended obtaining req and resp via AcquireRequest
1836// and AcquireResponse in performance-critical code.
1837func (c *PipelineClient) Do(req *Request, resp *Response) error {
1838	return c.getConnClient().Do(req, resp)
1839}
1840
1841func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
1842	c.init()
1843
1844	w := acquirePipelineWork(&c.workPool, 0)
1845	w.req = req
1846	if resp != nil {
1847		w.resp = resp
1848	} else {
1849		w.resp = &w.respCopy
1850	}
1851
1852	// Put the request to outgoing queue
1853	select {
1854	case c.chW <- w:
1855	default:
1856		// Try substituting the oldest w with the current one.
1857		select {
1858		case wOld := <-c.chW:
1859			wOld.err = ErrPipelineOverflow
1860			wOld.done <- struct{}{}
1861		default:
1862		}
1863		select {
1864		case c.chW <- w:
1865		default:
1866			releasePipelineWork(&c.workPool, w)
1867			return ErrPipelineOverflow
1868		}
1869	}
1870
1871	// Wait for the response
1872	<-w.done
1873	err := w.err
1874
1875	releasePipelineWork(&c.workPool, w)
1876
1877	return err
1878}
1879
1880func (c *PipelineClient) getConnClient() *pipelineConnClient {
1881	c.connClientsLock.Lock()
1882	cc := c.getConnClientUnlocked()
1883	c.connClientsLock.Unlock()
1884	return cc
1885}
1886
1887func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
1888	if len(c.connClients) == 0 {
1889		return c.newConnClient()
1890	}
1891
1892	// Return the client with the minimum number of pending requests.
1893	minCC := c.connClients[0]
1894	minReqs := minCC.PendingRequests()
1895	if minReqs == 0 {
1896		return minCC
1897	}
1898	for i := 1; i < len(c.connClients); i++ {
1899		cc := c.connClients[i]
1900		reqs := cc.PendingRequests()
1901		if reqs == 0 {
1902			return cc
1903		}
1904		if reqs < minReqs {
1905			minCC = cc
1906			minReqs = reqs
1907		}
1908	}
1909
1910	maxConns := c.MaxConns
1911	if maxConns <= 0 {
1912		maxConns = 1
1913	}
1914	if len(c.connClients) < maxConns {
1915		return c.newConnClient()
1916	}
1917	return minCC
1918}
1919
1920func (c *PipelineClient) newConnClient() *pipelineConnClient {
1921	cc := &pipelineConnClient{
1922		Addr:                c.Addr,
1923		MaxPendingRequests:  c.MaxPendingRequests,
1924		MaxBatchDelay:       c.MaxBatchDelay,
1925		Dial:                c.Dial,
1926		DialDualStack:       c.DialDualStack,
1927		IsTLS:               c.IsTLS,
1928		TLSConfig:           c.TLSConfig,
1929		MaxIdleConnDuration: c.MaxIdleConnDuration,
1930		ReadBufferSize:      c.ReadBufferSize,
1931		WriteBufferSize:     c.WriteBufferSize,
1932		ReadTimeout:         c.ReadTimeout,
1933		WriteTimeout:        c.WriteTimeout,
1934		Logger:              c.Logger,
1935	}
1936	c.connClients = append(c.connClients, cc)
1937	return cc
1938}
1939
1940// ErrPipelineOverflow may be returned from PipelineClient.Do*
1941// if the requests' queue is overflown.
1942var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxConns and/or MaxPendingRequests")
1943
1944// DefaultMaxPendingRequests is the default value
1945// for PipelineClient.MaxPendingRequests.
1946const DefaultMaxPendingRequests = 1024
1947
1948func (c *pipelineConnClient) init() {
1949	c.chLock.Lock()
1950	if c.chR == nil {
1951		maxPendingRequests := c.MaxPendingRequests
1952		if maxPendingRequests <= 0 {
1953			maxPendingRequests = DefaultMaxPendingRequests
1954		}
1955		c.chR = make(chan *pipelineWork, maxPendingRequests)
1956		if c.chW == nil {
1957			c.chW = make(chan *pipelineWork, maxPendingRequests)
1958		}
1959		go func() {
1960			if err := c.worker(); err != nil {
1961				c.logger().Printf("error in PipelineClient(%q): %s", c.Addr, err)
1962				if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
1963					// Throttle client reconnections on temporary errors
1964					time.Sleep(time.Second)
1965				}
1966			}
1967
1968			c.chLock.Lock()
1969			// Do not reset c.chW to nil, since it may contain
1970			// pending requests, which could be served on the next
1971			// connection to the host.
1972			c.chR = nil
1973			c.chLock.Unlock()
1974		}()
1975	}
1976	c.chLock.Unlock()
1977}
1978
1979func (c *pipelineConnClient) worker() error {
1980	tlsConfig := c.cachedTLSConfig()
1981	conn, err := dialAddr(c.Addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig)
1982	if err != nil {
1983		return err
1984	}
1985
1986	// Start reader and writer
1987	stopW := make(chan struct{})
1988	doneW := make(chan error)
1989	go func() {
1990		doneW <- c.writer(conn, stopW)
1991	}()
1992	stopR := make(chan struct{})
1993	doneR := make(chan error)
1994	go func() {
1995		doneR <- c.reader(conn, stopR)
1996	}()
1997
1998	// Wait until reader and writer are stopped
1999	select {
2000	case err = <-doneW:
2001		conn.Close()
2002		close(stopR)
2003		<-doneR
2004	case err = <-doneR:
2005		conn.Close()
2006		close(stopW)
2007		<-doneW
2008	}
2009
2010	// Notify pending readers
2011	for len(c.chR) > 0 {
2012		w := <-c.chR
2013		w.err = errPipelineConnStopped
2014		w.done <- struct{}{}
2015	}
2016
2017	return err
2018}
2019
2020func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
2021	if !c.IsTLS {
2022		return nil
2023	}
2024
2025	c.tlsConfigLock.Lock()
2026	cfg := c.tlsConfig
2027	if cfg == nil {
2028		cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
2029		c.tlsConfig = cfg
2030	}
2031	c.tlsConfigLock.Unlock()
2032
2033	return cfg
2034}
2035
2036func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
2037	writeBufferSize := c.WriteBufferSize
2038	if writeBufferSize <= 0 {
2039		writeBufferSize = defaultWriteBufferSize
2040	}
2041	bw := bufio.NewWriterSize(conn, writeBufferSize)
2042	defer bw.Flush()
2043	chR := c.chR
2044	chW := c.chW
2045	writeTimeout := c.WriteTimeout
2046
2047	maxIdleConnDuration := c.MaxIdleConnDuration
2048	if maxIdleConnDuration <= 0 {
2049		maxIdleConnDuration = DefaultMaxIdleConnDuration
2050	}
2051	maxBatchDelay := c.MaxBatchDelay
2052
2053	var (
2054		stopTimer      = time.NewTimer(time.Hour)
2055		flushTimer     = time.NewTimer(time.Hour)
2056		flushTimerCh   <-chan time.Time
2057		instantTimerCh = make(chan time.Time)
2058
2059		w   *pipelineWork
2060		err error
2061	)
2062	close(instantTimerCh)
2063	for {
2064	againChW:
2065		select {
2066		case w = <-chW:
2067			// Fast path: len(chW) > 0
2068		default:
2069			// Slow path
2070			stopTimer.Reset(maxIdleConnDuration)
2071			select {
2072			case w = <-chW:
2073			case <-stopTimer.C:
2074				return nil
2075			case <-stopCh:
2076				return nil
2077			case <-flushTimerCh:
2078				if err = bw.Flush(); err != nil {
2079					return err
2080				}
2081				flushTimerCh = nil
2082				goto againChW
2083			}
2084		}
2085
2086		if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
2087			w.err = ErrTimeout
2088			w.done <- struct{}{}
2089			continue
2090		}
2091
2092		w.resp.parseNetConn(conn)
2093
2094		if writeTimeout > 0 {
2095			// Set Deadline every time, since golang has fixed the performance issue
2096			// See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
2097			currentTime := time.Now()
2098			if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
2099				w.err = err
2100				w.done <- struct{}{}
2101				return err
2102			}
2103		}
2104		if err = w.req.Write(bw); err != nil {
2105			w.err = err
2106			w.done <- struct{}{}
2107			return err
2108		}
2109		if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
2110			if maxBatchDelay > 0 {
2111				flushTimer.Reset(maxBatchDelay)
2112				flushTimerCh = flushTimer.C
2113			} else {
2114				flushTimerCh = instantTimerCh
2115			}
2116		}
2117
2118	againChR:
2119		select {
2120		case chR <- w:
2121			// Fast path: len(chR) < cap(chR)
2122		default:
2123			// Slow path
2124			select {
2125			case chR <- w:
2126			case <-stopCh:
2127				w.err = errPipelineConnStopped
2128				w.done <- struct{}{}
2129				return nil
2130			case <-flushTimerCh:
2131				if err = bw.Flush(); err != nil {
2132					w.err = err
2133					w.done <- struct{}{}
2134					return err
2135				}
2136				flushTimerCh = nil
2137				goto againChR
2138			}
2139		}
2140	}
2141}
2142
2143func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
2144	readBufferSize := c.ReadBufferSize
2145	if readBufferSize <= 0 {
2146		readBufferSize = defaultReadBufferSize
2147	}
2148	br := bufio.NewReaderSize(conn, readBufferSize)
2149	chR := c.chR
2150	readTimeout := c.ReadTimeout
2151
2152	var (
2153		w   *pipelineWork
2154		err error
2155	)
2156	for {
2157		select {
2158		case w = <-chR:
2159			// Fast path: len(chR) > 0
2160		default:
2161			// Slow path
2162			select {
2163			case w = <-chR:
2164			case <-stopCh:
2165				return nil
2166			}
2167		}
2168
2169		if readTimeout > 0 {
2170			// Set Deadline every time, since golang has fixed the performance issue
2171			// See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
2172			currentTime := time.Now()
2173			if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
2174				w.err = err
2175				w.done <- struct{}{}
2176				return err
2177			}
2178		}
2179		if err = w.resp.Read(br); err != nil {
2180			w.err = err
2181			w.done <- struct{}{}
2182			return err
2183		}
2184
2185		w.done <- struct{}{}
2186	}
2187}
2188
2189func (c *pipelineConnClient) logger() Logger {
2190	if c.Logger != nil {
2191		return c.Logger
2192	}
2193	return defaultLogger
2194}
2195
2196// PendingRequests returns the current number of pending requests pipelined
2197// to the server.
2198//
2199// This number may exceed MaxPendingRequests*MaxConns by up to two times, since
2200// each connection to the server may keep up to MaxPendingRequests requests
2201// in the queue before sending them to the server.
2202//
2203// This function may be used for balancing load among multiple PipelineClient
2204// instances.
2205func (c *PipelineClient) PendingRequests() int {
2206	c.connClientsLock.Lock()
2207	n := 0
2208	for _, cc := range c.connClients {
2209		n += cc.PendingRequests()
2210	}
2211	c.connClientsLock.Unlock()
2212	return n
2213}
2214
2215func (c *pipelineConnClient) PendingRequests() int {
2216	c.init()
2217
2218	c.chLock.Lock()
2219	n := len(c.chR) + len(c.chW)
2220	c.chLock.Unlock()
2221	return n
2222}
2223
2224var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
2225
2226func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) *pipelineWork {
2227	v := pool.Get()
2228	if v == nil {
2229		v = &pipelineWork{
2230			done: make(chan struct{}, 1),
2231		}
2232	}
2233	w := v.(*pipelineWork)
2234	if timeout > 0 {
2235		if w.t == nil {
2236			w.t = time.NewTimer(timeout)
2237		} else {
2238			w.t.Reset(timeout)
2239		}
2240		w.deadline = time.Now().Add(timeout)
2241	} else {
2242		w.deadline = zeroTime
2243	}
2244	return w
2245}
2246
2247func releasePipelineWork(pool *sync.Pool, w *pipelineWork) {
2248	if w.t != nil {
2249		w.t.Stop()
2250	}
2251	w.reqCopy.Reset()
2252	w.respCopy.Reset()
2253	w.req = nil
2254	w.resp = nil
2255	w.err = nil
2256	pool.Put(w)
2257}
2258