1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package xdsclient 19 20import ( 21 "context" 22 23 "google.golang.org/grpc" 24 "google.golang.org/grpc/xds/internal/xdsclient/load" 25) 26 27// ReportLoad starts an load reporting stream to the given server. If the server 28// is not an empty string, and is different from the management server, a new 29// ClientConn will be created. 30// 31// The same options used for creating the Client will be used (including 32// NodeProto, and dial options if necessary). 33// 34// It returns a Store for the user to report loads, a function to cancel the 35// load reporting stream. 36func (c *clientImpl) ReportLoad(server string) (*load.Store, func()) { 37 c.lrsMu.Lock() 38 defer c.lrsMu.Unlock() 39 40 // If there's already a client to this server, use it. Otherwise, create 41 // one. 42 lrsC, ok := c.lrsClients[server] 43 if !ok { 44 lrsC = newLRSClient(c, server) 45 c.lrsClients[server] = lrsC 46 } 47 48 store := lrsC.ref() 49 return store, func() { 50 // This is a callback, need to hold lrsMu. 51 c.lrsMu.Lock() 52 defer c.lrsMu.Unlock() 53 if lrsC.unRef() { 54 // Delete the lrsClient from map if this is the last reference. 55 delete(c.lrsClients, server) 56 } 57 } 58} 59 60// lrsClient maps to one lrsServer. It contains: 61// - a ClientConn to this server (only if it's different from the management 62// server) 63// - a load.Store that contains loads only for this server 64type lrsClient struct { 65 parent *clientImpl 66 server string 67 68 cc *grpc.ClientConn // nil if the server is same as the management server 69 refCount int 70 cancelStream func() 71 loadStore *load.Store 72} 73 74// newLRSClient creates a new LRS stream to the server. 75func newLRSClient(parent *clientImpl, server string) *lrsClient { 76 return &lrsClient{ 77 parent: parent, 78 server: server, 79 refCount: 0, 80 } 81} 82 83// ref increments the refCount. If this is the first ref, it starts the LRS stream. 84// 85// Not thread-safe, caller needs to synchronize. 86func (lrsC *lrsClient) ref() *load.Store { 87 lrsC.refCount++ 88 if lrsC.refCount == 1 { 89 lrsC.startStream() 90 } 91 return lrsC.loadStore 92} 93 94// unRef decrements the refCount, and closes the stream if refCount reaches 0 95// (and close the cc if cc is not xDS cc). It returns whether refCount reached 0 96// after this call. 97// 98// Not thread-safe, caller needs to synchronize. 99func (lrsC *lrsClient) unRef() (closed bool) { 100 lrsC.refCount-- 101 if lrsC.refCount != 0 { 102 return false 103 } 104 lrsC.parent.logger.Infof("Stopping load report to server: %s", lrsC.server) 105 lrsC.cancelStream() 106 if lrsC.cc != nil { 107 lrsC.cc.Close() 108 } 109 return true 110} 111 112// startStream starts the LRS stream to the server. If server is not the same 113// management server from the parent, it also creates a ClientConn. 114func (lrsC *lrsClient) startStream() { 115 var cc *grpc.ClientConn 116 117 lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server) 118 if lrsC.server == "" || lrsC.server == lrsC.parent.config.BalancerName { 119 // Reuse the xDS client if server is the same. 120 cc = lrsC.parent.cc 121 } else { 122 lrsC.parent.logger.Infof("LRS server is different from management server, starting a new ClientConn") 123 ccNew, err := grpc.Dial(lrsC.server, lrsC.parent.config.Creds) 124 if err != nil { 125 // An error from a non-blocking dial indicates something serious. 126 lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err) 127 return 128 } 129 cc = ccNew 130 lrsC.cc = ccNew 131 } 132 133 var ctx context.Context 134 ctx, lrsC.cancelStream = context.WithCancel(context.Background()) 135 136 // Create the store and stream. 137 lrsC.loadStore = load.NewStore() 138 go lrsC.parent.apiClient.reportLoad(ctx, cc, loadReportingOptions{loadStore: lrsC.loadStore}) 139} 140