1// Copyright (c) 2017 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package remote 16 17import ( 18 "fmt" 19 "net/url" 20 "sync" 21 "time" 22 23 "github.com/uber/jaeger-client-go/internal/baggage" 24 thrift "github.com/uber/jaeger-client-go/thrift-gen/baggage" 25 "github.com/uber/jaeger-client-go/utils" 26) 27 28type httpBaggageRestrictionManagerProxy struct { 29 url string 30} 31 32func newHTTPBaggageRestrictionManagerProxy(hostPort, serviceName string) *httpBaggageRestrictionManagerProxy { 33 v := url.Values{} 34 v.Set("service", serviceName) 35 return &httpBaggageRestrictionManagerProxy{ 36 url: fmt.Sprintf("http://%s/baggageRestrictions?%s", hostPort, v.Encode()), 37 } 38} 39 40func (s *httpBaggageRestrictionManagerProxy) GetBaggageRestrictions(serviceName string) ([]*thrift.BaggageRestriction, error) { 41 var out []*thrift.BaggageRestriction 42 if err := utils.GetJSON(s.url, &out); err != nil { 43 return nil, err 44 } 45 return out, nil 46} 47 48// RestrictionManager manages baggage restrictions by retrieving baggage restrictions from agent 49type RestrictionManager struct { 50 options 51 52 mux sync.RWMutex 53 serviceName string 54 restrictions map[string]*baggage.Restriction 55 thriftProxy thrift.BaggageRestrictionManager 56 pollStopped sync.WaitGroup 57 stopPoll chan struct{} 58 invalidRestriction *baggage.Restriction 59 validRestriction *baggage.Restriction 60 61 // Determines if the manager has successfully retrieved baggage restrictions from agent 62 initialized bool 63} 64 65// NewRestrictionManager returns a BaggageRestrictionManager that polls the agent for the latest 66// baggage restrictions. 67func NewRestrictionManager(serviceName string, options ...Option) *RestrictionManager { 68 // TODO there is a developing use case where a single tracer can generate traces on behalf of many services. 69 // restrictionsMap will need to exist per service 70 opts := applyOptions(options...) 71 m := &RestrictionManager{ 72 serviceName: serviceName, 73 options: opts, 74 restrictions: make(map[string]*baggage.Restriction), 75 thriftProxy: newHTTPBaggageRestrictionManagerProxy(opts.hostPort, serviceName), 76 stopPoll: make(chan struct{}), 77 invalidRestriction: baggage.NewRestriction(false, 0), 78 validRestriction: baggage.NewRestriction(true, defaultMaxValueLength), 79 } 80 m.pollStopped.Add(1) 81 go m.pollManager() 82 return m 83} 84 85// isReady returns true if the manager has retrieved baggage restrictions from the remote source. 86func (m *RestrictionManager) isReady() bool { 87 m.mux.RLock() 88 defer m.mux.RUnlock() 89 return m.initialized 90} 91 92// GetRestriction implements RestrictionManager#GetRestriction. 93func (m *RestrictionManager) GetRestriction(service, key string) *baggage.Restriction { 94 m.mux.RLock() 95 defer m.mux.RUnlock() 96 if !m.initialized { 97 if m.denyBaggageOnInitializationFailure { 98 return m.invalidRestriction 99 } 100 return m.validRestriction 101 } 102 if restriction, ok := m.restrictions[key]; ok { 103 return restriction 104 } 105 return m.invalidRestriction 106} 107 108// Close stops remote polling and closes the RemoteRestrictionManager. 109func (m *RestrictionManager) Close() error { 110 close(m.stopPoll) 111 m.pollStopped.Wait() 112 return nil 113} 114 115func (m *RestrictionManager) pollManager() { 116 defer m.pollStopped.Done() 117 // attempt to initialize baggage restrictions 118 if err := m.updateRestrictions(); err != nil { 119 m.logger.Error(fmt.Sprintf("Failed to initialize baggage restrictions: %s", err.Error())) 120 } 121 ticker := time.NewTicker(m.refreshInterval) 122 defer ticker.Stop() 123 124 for { 125 select { 126 case <-ticker.C: 127 if err := m.updateRestrictions(); err != nil { 128 m.logger.Error(fmt.Sprintf("Failed to update baggage restrictions: %s", err.Error())) 129 } 130 case <-m.stopPoll: 131 return 132 } 133 } 134} 135 136func (m *RestrictionManager) updateRestrictions() error { 137 restrictions, err := m.thriftProxy.GetBaggageRestrictions(m.serviceName) 138 if err != nil { 139 m.metrics.BaggageRestrictionsUpdateFailure.Inc(1) 140 return err 141 } 142 newRestrictions := m.parseRestrictions(restrictions) 143 m.metrics.BaggageRestrictionsUpdateSuccess.Inc(1) 144 m.mux.Lock() 145 defer m.mux.Unlock() 146 m.initialized = true 147 m.restrictions = newRestrictions 148 return nil 149} 150 151func (m *RestrictionManager) parseRestrictions(restrictions []*thrift.BaggageRestriction) map[string]*baggage.Restriction { 152 setters := make(map[string]*baggage.Restriction, len(restrictions)) 153 for _, restriction := range restrictions { 154 setters[restriction.BaggageKey] = baggage.NewRestriction(true, int(restriction.MaxValueLength)) 155 } 156 return setters 157} 158