1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package xdsclient
19
20import (
21	"context"
22
23	"google.golang.org/grpc"
24	"google.golang.org/grpc/xds/internal/xdsclient/load"
25)
26
27// ReportLoad starts an load reporting stream to the given server. If the server
28// is not an empty string, and is different from the management server, a new
29// ClientConn will be created.
30//
31// The same options used for creating the Client will be used (including
32// NodeProto, and dial options if necessary).
33//
34// It returns a Store for the user to report loads, a function to cancel the
35// load reporting stream.
36func (c *clientImpl) ReportLoad(server string) (*load.Store, func()) {
37	c.lrsMu.Lock()
38	defer c.lrsMu.Unlock()
39
40	// If there's already a client to this server, use it. Otherwise, create
41	// one.
42	lrsC, ok := c.lrsClients[server]
43	if !ok {
44		lrsC = newLRSClient(c, server)
45		c.lrsClients[server] = lrsC
46	}
47
48	store := lrsC.ref()
49	return store, func() {
50		// This is a callback, need to hold lrsMu.
51		c.lrsMu.Lock()
52		defer c.lrsMu.Unlock()
53		if lrsC.unRef() {
54			// Delete the lrsClient from map if this is the last reference.
55			delete(c.lrsClients, server)
56		}
57	}
58}
59
60// lrsClient maps to one lrsServer. It contains:
61// - a ClientConn to this server (only if it's different from the management
62// server)
63// - a load.Store that contains loads only for this server
64type lrsClient struct {
65	parent *clientImpl
66	server string
67
68	cc           *grpc.ClientConn // nil if the server is same as the management server
69	refCount     int
70	cancelStream func()
71	loadStore    *load.Store
72}
73
74// newLRSClient creates a new LRS stream to the server.
75func newLRSClient(parent *clientImpl, server string) *lrsClient {
76	return &lrsClient{
77		parent:   parent,
78		server:   server,
79		refCount: 0,
80	}
81}
82
83// ref increments the refCount. If this is the first ref, it starts the LRS stream.
84//
85// Not thread-safe, caller needs to synchronize.
86func (lrsC *lrsClient) ref() *load.Store {
87	lrsC.refCount++
88	if lrsC.refCount == 1 {
89		lrsC.startStream()
90	}
91	return lrsC.loadStore
92}
93
94// unRef decrements the refCount, and closes the stream if refCount reaches 0
95// (and close the cc if cc is not xDS cc). It returns whether refCount reached 0
96// after this call.
97//
98// Not thread-safe, caller needs to synchronize.
99func (lrsC *lrsClient) unRef() (closed bool) {
100	lrsC.refCount--
101	if lrsC.refCount != 0 {
102		return false
103	}
104	lrsC.parent.logger.Infof("Stopping load report to server: %s", lrsC.server)
105	lrsC.cancelStream()
106	if lrsC.cc != nil {
107		lrsC.cc.Close()
108	}
109	return true
110}
111
112// startStream starts the LRS stream to the server. If server is not the same
113// management server from the parent, it also creates a ClientConn.
114func (lrsC *lrsClient) startStream() {
115	var cc *grpc.ClientConn
116
117	lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server)
118	if lrsC.server == "" || lrsC.server == lrsC.parent.config.BalancerName {
119		// Reuse the xDS client if server is the same.
120		cc = lrsC.parent.cc
121	} else {
122		lrsC.parent.logger.Infof("LRS server is different from management server, starting a new ClientConn")
123		ccNew, err := grpc.Dial(lrsC.server, lrsC.parent.config.Creds)
124		if err != nil {
125			// An error from a non-blocking dial indicates something serious.
126			lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
127			return
128		}
129		cc = ccNew
130		lrsC.cc = ccNew
131	}
132
133	var ctx context.Context
134	ctx, lrsC.cancelStream = context.WithCancel(context.Background())
135
136	// Create the store and stream.
137	lrsC.loadStore = load.NewStore()
138	go lrsC.parent.apiClient.reportLoad(ctx, cc, loadReportingOptions{loadStore: lrsC.loadStore})
139}
140