1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"fmt"
23	"net"
24	"sync"
25
26	"golang.org/x/net/context"
27	"google.golang.org/grpc/codes"
28	"google.golang.org/grpc/credentials"
29	"google.golang.org/grpc/grpclog"
30	"google.golang.org/grpc/naming"
31	"google.golang.org/grpc/status"
32)
33
34// Address represents a server the client connects to.
35//
36// Deprecated: please use package balancer.
37type Address struct {
38	// Addr is the server address on which a connection will be established.
39	Addr string
40	// Metadata is the information associated with Addr, which may be used
41	// to make load balancing decision.
42	Metadata interface{}
43}
44
45// BalancerConfig specifies the configurations for Balancer.
46//
47// Deprecated: please use package balancer.
48type BalancerConfig struct {
49	// DialCreds is the transport credential the Balancer implementation can
50	// use to dial to a remote load balancer server. The Balancer implementations
51	// can ignore this if it does not need to talk to another party securely.
52	DialCreds credentials.TransportCredentials
53	// Dialer is the custom dialer the Balancer implementation can use to dial
54	// to a remote load balancer server. The Balancer implementations
55	// can ignore this if it doesn't need to talk to remote balancer.
56	Dialer func(context.Context, string) (net.Conn, error)
57}
58
59// BalancerGetOptions configures a Get call.
60//
61// Deprecated: please use package balancer.
62type BalancerGetOptions struct {
63	// BlockingWait specifies whether Get should block when there is no
64	// connected address.
65	BlockingWait bool
66}
67
68// Balancer chooses network addresses for RPCs.
69//
70// Deprecated: please use package balancer.
71type Balancer interface {
72	// Start does the initialization work to bootstrap a Balancer. For example,
73	// this function may start the name resolution and watch the updates. It will
74	// be called when dialing.
75	Start(target string, config BalancerConfig) error
76	// Up informs the Balancer that gRPC has a connection to the server at
77	// addr. It returns down which is called once the connection to addr gets
78	// lost or closed.
79	// TODO: It is not clear how to construct and take advantage of the meaningful error
80	// parameter for down. Need realistic demands to guide.
81	Up(addr Address) (down func(error))
82	// Get gets the address of a server for the RPC corresponding to ctx.
83	// i) If it returns a connected address, gRPC internals issues the RPC on the
84	// connection to this address;
85	// ii) If it returns an address on which the connection is under construction
86	// (initiated by Notify(...)) but not connected, gRPC internals
87	//  * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or
88	//  Shutdown state;
89	//  or
90	//  * issues RPC on the connection otherwise.
91	// iii) If it returns an address on which the connection does not exist, gRPC
92	// internals treats it as an error and will fail the corresponding RPC.
93	//
94	// Therefore, the following is the recommended rule when writing a custom Balancer.
95	// If opts.BlockingWait is true, it should return a connected address or
96	// block if there is no connected address. It should respect the timeout or
97	// cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast
98	// RPCs), it should return an address it has notified via Notify(...) immediately
99	// instead of blocking.
100	//
101	// The function returns put which is called once the rpc has completed or failed.
102	// put can collect and report RPC stats to a remote load balancer.
103	//
104	// This function should only return the errors Balancer cannot recover by itself.
105	// gRPC internals will fail the RPC if an error is returned.
106	Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
107	// Notify returns a channel that is used by gRPC internals to watch the addresses
108	// gRPC needs to connect. The addresses might be from a name resolver or remote
109	// load balancer. gRPC internals will compare it with the existing connected
110	// addresses. If the address Balancer notified is not in the existing connected
111	// addresses, gRPC starts to connect the address. If an address in the existing
112	// connected addresses is not in the notification list, the corresponding connection
113	// is shutdown gracefully. Otherwise, there are no operations to take. Note that
114	// the Address slice must be the full list of the Addresses which should be connected.
115	// It is NOT delta.
116	Notify() <-chan []Address
117	// Close shuts down the balancer.
118	Close() error
119}
120
121// downErr implements net.Error. It is constructed by gRPC internals and passed to the down
122// call of Balancer.
123type downErr struct {
124	timeout   bool
125	temporary bool
126	desc      string
127}
128
129func (e downErr) Error() string   { return e.desc }
130func (e downErr) Timeout() bool   { return e.timeout }
131func (e downErr) Temporary() bool { return e.temporary }
132
133func downErrorf(timeout, temporary bool, format string, a ...interface{}) downErr {
134	return downErr{
135		timeout:   timeout,
136		temporary: temporary,
137		desc:      fmt.Sprintf(format, a...),
138	}
139}
140
141// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
142// the name resolution updates and updates the addresses available correspondingly.
143//
144// Deprecated: please use package balancer/roundrobin.
145func RoundRobin(r naming.Resolver) Balancer {
146	return &roundRobin{r: r}
147}
148
149type addrInfo struct {
150	addr      Address
151	connected bool
152}
153
154type roundRobin struct {
155	r      naming.Resolver
156	w      naming.Watcher
157	addrs  []*addrInfo // all the addresses the client should potentially connect
158	mu     sync.Mutex
159	addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
160	next   int            // index of the next address to return for Get()
161	waitCh chan struct{}  // the channel to block when there is no connected address available
162	done   bool           // The Balancer is closed.
163}
164
165func (rr *roundRobin) watchAddrUpdates() error {
166	updates, err := rr.w.Next()
167	if err != nil {
168		grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err)
169		return err
170	}
171	rr.mu.Lock()
172	defer rr.mu.Unlock()
173	for _, update := range updates {
174		addr := Address{
175			Addr:     update.Addr,
176			Metadata: update.Metadata,
177		}
178		switch update.Op {
179		case naming.Add:
180			var exist bool
181			for _, v := range rr.addrs {
182				if addr == v.addr {
183					exist = true
184					grpclog.Infoln("grpc: The name resolver wanted to add an existing address: ", addr)
185					break
186				}
187			}
188			if exist {
189				continue
190			}
191			rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
192		case naming.Delete:
193			for i, v := range rr.addrs {
194				if addr == v.addr {
195					copy(rr.addrs[i:], rr.addrs[i+1:])
196					rr.addrs = rr.addrs[:len(rr.addrs)-1]
197					break
198				}
199			}
200		default:
201			grpclog.Errorln("Unknown update.Op ", update.Op)
202		}
203	}
204	// Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
205	open := make([]Address, len(rr.addrs))
206	for i, v := range rr.addrs {
207		open[i] = v.addr
208	}
209	if rr.done {
210		return ErrClientConnClosing
211	}
212	select {
213	case <-rr.addrCh:
214	default:
215	}
216	rr.addrCh <- open
217	return nil
218}
219
220func (rr *roundRobin) Start(target string, config BalancerConfig) error {
221	rr.mu.Lock()
222	defer rr.mu.Unlock()
223	if rr.done {
224		return ErrClientConnClosing
225	}
226	if rr.r == nil {
227		// If there is no name resolver installed, it is not needed to
228		// do name resolution. In this case, target is added into rr.addrs
229		// as the only address available and rr.addrCh stays nil.
230		rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
231		return nil
232	}
233	w, err := rr.r.Resolve(target)
234	if err != nil {
235		return err
236	}
237	rr.w = w
238	rr.addrCh = make(chan []Address, 1)
239	go func() {
240		for {
241			if err := rr.watchAddrUpdates(); err != nil {
242				return
243			}
244		}
245	}()
246	return nil
247}
248
249// Up sets the connected state of addr and sends notification if there are pending
250// Get() calls.
251func (rr *roundRobin) Up(addr Address) func(error) {
252	rr.mu.Lock()
253	defer rr.mu.Unlock()
254	var cnt int
255	for _, a := range rr.addrs {
256		if a.addr == addr {
257			if a.connected {
258				return nil
259			}
260			a.connected = true
261		}
262		if a.connected {
263			cnt++
264		}
265	}
266	// addr is only one which is connected. Notify the Get() callers who are blocking.
267	if cnt == 1 && rr.waitCh != nil {
268		close(rr.waitCh)
269		rr.waitCh = nil
270	}
271	return func(err error) {
272		rr.down(addr, err)
273	}
274}
275
276// down unsets the connected state of addr.
277func (rr *roundRobin) down(addr Address, err error) {
278	rr.mu.Lock()
279	defer rr.mu.Unlock()
280	for _, a := range rr.addrs {
281		if addr == a.addr {
282			a.connected = false
283			break
284		}
285	}
286}
287
288// Get returns the next addr in the rotation.
289func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
290	var ch chan struct{}
291	rr.mu.Lock()
292	if rr.done {
293		rr.mu.Unlock()
294		err = ErrClientConnClosing
295		return
296	}
297
298	if len(rr.addrs) > 0 {
299		if rr.next >= len(rr.addrs) {
300			rr.next = 0
301		}
302		next := rr.next
303		for {
304			a := rr.addrs[next]
305			next = (next + 1) % len(rr.addrs)
306			if a.connected {
307				addr = a.addr
308				rr.next = next
309				rr.mu.Unlock()
310				return
311			}
312			if next == rr.next {
313				// Has iterated all the possible address but none is connected.
314				break
315			}
316		}
317	}
318	if !opts.BlockingWait {
319		if len(rr.addrs) == 0 {
320			rr.mu.Unlock()
321			err = status.Errorf(codes.Unavailable, "there is no address available")
322			return
323		}
324		// Returns the next addr on rr.addrs for failfast RPCs.
325		addr = rr.addrs[rr.next].addr
326		rr.next++
327		rr.mu.Unlock()
328		return
329	}
330	// Wait on rr.waitCh for non-failfast RPCs.
331	if rr.waitCh == nil {
332		ch = make(chan struct{})
333		rr.waitCh = ch
334	} else {
335		ch = rr.waitCh
336	}
337	rr.mu.Unlock()
338	for {
339		select {
340		case <-ctx.Done():
341			err = ctx.Err()
342			return
343		case <-ch:
344			rr.mu.Lock()
345			if rr.done {
346				rr.mu.Unlock()
347				err = ErrClientConnClosing
348				return
349			}
350
351			if len(rr.addrs) > 0 {
352				if rr.next >= len(rr.addrs) {
353					rr.next = 0
354				}
355				next := rr.next
356				for {
357					a := rr.addrs[next]
358					next = (next + 1) % len(rr.addrs)
359					if a.connected {
360						addr = a.addr
361						rr.next = next
362						rr.mu.Unlock()
363						return
364					}
365					if next == rr.next {
366						// Has iterated all the possible address but none is connected.
367						break
368					}
369				}
370			}
371			// The newly added addr got removed by Down() again.
372			if rr.waitCh == nil {
373				ch = make(chan struct{})
374				rr.waitCh = ch
375			} else {
376				ch = rr.waitCh
377			}
378			rr.mu.Unlock()
379		}
380	}
381}
382
383func (rr *roundRobin) Notify() <-chan []Address {
384	return rr.addrCh
385}
386
387func (rr *roundRobin) Close() error {
388	rr.mu.Lock()
389	defer rr.mu.Unlock()
390	if rr.done {
391		return errBalancerClosed
392	}
393	rr.done = true
394	if rr.w != nil {
395		rr.w.Close()
396	}
397	if rr.waitCh != nil {
398		close(rr.waitCh)
399		rr.waitCh = nil
400	}
401	if rr.addrCh != nil {
402		close(rr.addrCh)
403	}
404	return nil
405}
406
407// pickFirst is used to test multi-addresses in one addrConn in which all addresses share the same addrConn.
408// It is a wrapper around roundRobin balancer. The logic of all methods works fine because balancer.Get()
409// returns the only address Up by resetTransport().
410type pickFirst struct {
411	*roundRobin
412}
413
414func pickFirstBalancerV1(r naming.Resolver) Balancer {
415	return &pickFirst{&roundRobin{r: r}}
416}
417