1// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package remote
16
17import (
18	"fmt"
19	"net/url"
20	"sync"
21	"time"
22
23	"github.com/uber/jaeger-client-go/internal/baggage"
24	thrift "github.com/uber/jaeger-client-go/thrift-gen/baggage"
25	"github.com/uber/jaeger-client-go/utils"
26)
27
28type httpBaggageRestrictionManagerProxy struct {
29	url string
30}
31
32func newHTTPBaggageRestrictionManagerProxy(hostPort, serviceName string) *httpBaggageRestrictionManagerProxy {
33	v := url.Values{}
34	v.Set("service", serviceName)
35	return &httpBaggageRestrictionManagerProxy{
36		url: fmt.Sprintf("http://%s/baggageRestrictions?%s", hostPort, v.Encode()),
37	}
38}
39
40func (s *httpBaggageRestrictionManagerProxy) GetBaggageRestrictions(serviceName string) ([]*thrift.BaggageRestriction, error) {
41	var out []*thrift.BaggageRestriction
42	if err := utils.GetJSON(s.url, &out); err != nil {
43		return nil, err
44	}
45	return out, nil
46}
47
48// RestrictionManager manages baggage restrictions by retrieving baggage restrictions from agent
49type RestrictionManager struct {
50	options
51
52	mux                sync.RWMutex
53	serviceName        string
54	restrictions       map[string]*baggage.Restriction
55	thriftProxy        thrift.BaggageRestrictionManager
56	pollStopped        sync.WaitGroup
57	stopPoll           chan struct{}
58	invalidRestriction *baggage.Restriction
59	validRestriction   *baggage.Restriction
60
61	// Determines if the manager has successfully retrieved baggage restrictions from agent
62	initialized bool
63}
64
65// NewRestrictionManager returns a BaggageRestrictionManager that polls the agent for the latest
66// baggage restrictions.
67func NewRestrictionManager(serviceName string, options ...Option) *RestrictionManager {
68	// TODO there is a developing use case where a single tracer can generate traces on behalf of many services.
69	// restrictionsMap will need to exist per service
70	opts := applyOptions(options...)
71	m := &RestrictionManager{
72		serviceName:        serviceName,
73		options:            opts,
74		restrictions:       make(map[string]*baggage.Restriction),
75		thriftProxy:        newHTTPBaggageRestrictionManagerProxy(opts.hostPort, serviceName),
76		stopPoll:           make(chan struct{}),
77		invalidRestriction: baggage.NewRestriction(false, 0),
78		validRestriction:   baggage.NewRestriction(true, defaultMaxValueLength),
79	}
80	m.pollStopped.Add(1)
81	go m.pollManager()
82	return m
83}
84
85// isReady returns true if the manager has retrieved baggage restrictions from the remote source.
86func (m *RestrictionManager) isReady() bool {
87	m.mux.RLock()
88	defer m.mux.RUnlock()
89	return m.initialized
90}
91
92// GetRestriction implements RestrictionManager#GetRestriction.
93func (m *RestrictionManager) GetRestriction(service, key string) *baggage.Restriction {
94	m.mux.RLock()
95	defer m.mux.RUnlock()
96	if !m.initialized {
97		if m.denyBaggageOnInitializationFailure {
98			return m.invalidRestriction
99		}
100		return m.validRestriction
101	}
102	if restriction, ok := m.restrictions[key]; ok {
103		return restriction
104	}
105	return m.invalidRestriction
106}
107
108// Close stops remote polling and closes the RemoteRestrictionManager.
109func (m *RestrictionManager) Close() error {
110	close(m.stopPoll)
111	m.pollStopped.Wait()
112	return nil
113}
114
115func (m *RestrictionManager) pollManager() {
116	defer m.pollStopped.Done()
117	// attempt to initialize baggage restrictions
118	if err := m.updateRestrictions(); err != nil {
119		m.logger.Error(fmt.Sprintf("Failed to initialize baggage restrictions: %s", err.Error()))
120	}
121	ticker := time.NewTicker(m.refreshInterval)
122	defer ticker.Stop()
123
124	for {
125		select {
126		case <-ticker.C:
127			if err := m.updateRestrictions(); err != nil {
128				m.logger.Error(fmt.Sprintf("Failed to update baggage restrictions: %s", err.Error()))
129			}
130		case <-m.stopPoll:
131			return
132		}
133	}
134}
135
136func (m *RestrictionManager) updateRestrictions() error {
137	restrictions, err := m.thriftProxy.GetBaggageRestrictions(m.serviceName)
138	if err != nil {
139		m.metrics.BaggageRestrictionsUpdateFailure.Inc(1)
140		return err
141	}
142	newRestrictions := m.parseRestrictions(restrictions)
143	m.metrics.BaggageRestrictionsUpdateSuccess.Inc(1)
144	m.mux.Lock()
145	defer m.mux.Unlock()
146	m.initialized = true
147	m.restrictions = newRestrictions
148	return nil
149}
150
151func (m *RestrictionManager) parseRestrictions(restrictions []*thrift.BaggageRestriction) map[string]*baggage.Restriction {
152	setters := make(map[string]*baggage.Restriction, len(restrictions))
153	for _, restriction := range restrictions {
154		setters[restriction.BaggageKey] = baggage.NewRestriction(true, int(restriction.MaxValueLength))
155	}
156	return setters
157}
158