1// Copyright 2018 The OPA Authors.  All rights reserved.
2// Use of this source code is governed by an Apache2
3// license that can be found in the LICENSE file.
4
5// Package discovery implements configuration discovery.
6package discovery
7
8import (
9	"context"
10	"encoding/json"
11	"fmt"
12	"sync"
13
14	"github.com/open-policy-agent/opa/metrics"
15
16	"github.com/sirupsen/logrus"
17
18	"github.com/open-policy-agent/opa/ast"
19	bundleApi "github.com/open-policy-agent/opa/bundle"
20	"github.com/open-policy-agent/opa/config"
21	"github.com/open-policy-agent/opa/download"
22	"github.com/open-policy-agent/opa/plugins"
23	"github.com/open-policy-agent/opa/plugins/bundle"
24	"github.com/open-policy-agent/opa/plugins/logs"
25	"github.com/open-policy-agent/opa/plugins/status"
26	"github.com/open-policy-agent/opa/rego"
27	"github.com/open-policy-agent/opa/storage/inmem"
28)
29
30// Name is the discovery plugin name that will be registered with the plugin manager.
31const Name = "discovery"
32
33// Discovery implements configuration discovery for OPA. When discovery is
34// started it will periodically download a configuration bundle and try to
35// reconfigure the OPA.
36type Discovery struct {
37	manager    *plugins.Manager
38	config     *Config
39	factories  map[string]plugins.Factory
40	downloader *download.Downloader // discovery bundle downloader
41	status     *bundle.Status       // discovery status
42	etag       string               // discovery bundle etag for caching purposes
43	metrics    metrics.Metrics
44	readyOnce  sync.Once
45}
46
47// Factories provides a set of factory functions to use for
48// instantiating custom plugins.
49func Factories(fs map[string]plugins.Factory) func(*Discovery) {
50	return func(d *Discovery) {
51		d.factories = fs
52	}
53}
54
55// Metrics provides a metrics provider to pass to plugins.
56func Metrics(m metrics.Metrics) func(*Discovery) {
57	return func(d *Discovery) {
58		d.metrics = m
59	}
60}
61
62// New returns a new discovery plugin.
63func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error) {
64
65	result := &Discovery{
66		manager: manager,
67	}
68
69	for _, f := range opts {
70		f(result)
71	}
72
73	config, err := ParseConfig(manager.Config.Discovery, manager.Services())
74
75	if err != nil {
76		return nil, err
77	} else if config == nil {
78		if _, err := getPluginSet(result.factories, manager, manager.Config, result.metrics); err != nil {
79			return nil, err
80		}
81		return result, nil
82	}
83
84	if manager.Config.PluginsEnabled() {
85		return nil, fmt.Errorf("plugins cannot be specified in the bootstrap configuration when discovery enabled")
86	}
87
88	result.config = config
89	result.downloader = download.New(config.Config, manager.Client(config.service), config.path).WithCallback(result.oneShot)
90	result.status = &bundle.Status{
91		Name: *config.Name,
92	}
93
94	manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady})
95	return result, nil
96}
97
98// Start starts the dynamic discovery process if configured.
99func (c *Discovery) Start(ctx context.Context) error {
100	if c.downloader != nil {
101		c.downloader.Start(ctx)
102	} else {
103		// If there is no dynamic discovery then update the status to OK.
104		c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK})
105	}
106	return nil
107}
108
109// Stop stops the dynamic discovery process if configured.
110func (c *Discovery) Stop(ctx context.Context) {
111	if c.downloader != nil {
112		c.downloader.Stop(ctx)
113	}
114
115	c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady})
116}
117
118// Reconfigure is a no-op on discovery.
119func (c *Discovery) Reconfigure(_ context.Context, _ interface{}) {
120
121}
122
123func (c *Discovery) oneShot(ctx context.Context, u download.Update) {
124
125	c.processUpdate(ctx, u)
126
127	if p := status.Lookup(c.manager); p != nil {
128		p.UpdateDiscoveryStatus(*c.status)
129	}
130}
131
132func (c *Discovery) processUpdate(ctx context.Context, u download.Update) {
133
134	if u.Error != nil {
135		c.logError("Discovery download failed: %v", u.Error)
136		c.status.SetError(u.Error)
137		c.downloader.ClearCache()
138		return
139	}
140
141	if u.Bundle != nil {
142		c.status.SetDownloadSuccess()
143
144		if err := c.reconfigure(ctx, u); err != nil {
145			c.logError("Discovery reconfiguration error occurred: %v", err)
146			c.status.SetError(err)
147			c.downloader.ClearCache()
148			return
149		}
150
151		c.status.SetError(nil)
152		c.status.SetActivateSuccess(u.Bundle.Manifest.Revision)
153
154		// On the first activation success mark the plugin as being in OK state
155		c.readyOnce.Do(func() {
156			c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK})
157		})
158
159		if u.ETag != "" {
160			c.logInfo("Discovery update processed successfully. Etag updated to %v.", u.ETag)
161		} else {
162			c.logInfo("Discovery update processed successfully.")
163		}
164		c.etag = u.ETag
165		return
166	}
167
168	if u.ETag == c.etag {
169		c.logDebug("Discovery update skipped, server replied with not modified.")
170		c.status.SetError(nil)
171		return
172	}
173}
174
175func (c *Discovery) reconfigure(ctx context.Context, u download.Update) error {
176
177	config, ps, err := processBundle(ctx, c.manager, c.factories, u.Bundle, c.config.query, c.metrics)
178	if err != nil {
179		return err
180	}
181
182	if err := c.manager.Reconfigure(config); err != nil {
183		return err
184	}
185
186	// TODO(tsandall): we don't currently support changes to discovery
187	// configuration. These changes are risky because errors would be
188	// unrecoverable (without keeping track of changes and rolling back...)
189
190	// TODO(tsandall): add protection against discovery -service- changing.
191	for _, p := range ps.Start {
192		if err := p.Start(ctx); err != nil {
193			return err
194		}
195	}
196
197	for _, p := range ps.Reconfig {
198		p.Plugin.Reconfigure(ctx, p.Config)
199	}
200
201	return nil
202}
203
204func (c *Discovery) logError(fmt string, a ...interface{}) {
205	logrus.WithFields(c.logrusFields()).Errorf(fmt, a...)
206}
207
208func (c *Discovery) logInfo(fmt string, a ...interface{}) {
209	logrus.WithFields(c.logrusFields()).Infof(fmt, a...)
210}
211
212func (c *Discovery) logDebug(fmt string, a ...interface{}) {
213	logrus.WithFields(c.logrusFields()).Debugf(fmt, a...)
214}
215
216func (c *Discovery) logrusFields() logrus.Fields {
217	return logrus.Fields{
218		"name":   *c.config.Name,
219		"plugin": "discovery",
220	}
221}
222
223func processBundle(ctx context.Context, manager *plugins.Manager, factories map[string]plugins.Factory, b *bundleApi.Bundle, query string, m metrics.Metrics) (*config.Config, *pluginSet, error) {
224
225	config, err := evaluateBundle(ctx, manager.ID, manager.Info, b, query)
226	if err != nil {
227		return nil, nil, err
228	}
229
230	ps, err := getPluginSet(factories, manager, config, m)
231	return config, ps, err
232}
233
234func evaluateBundle(ctx context.Context, id string, info *ast.Term, b *bundleApi.Bundle, query string) (*config.Config, error) {
235
236	modules := b.ParsedModules("discovery")
237
238	compiler := ast.NewCompiler()
239
240	if compiler.Compile(modules); compiler.Failed() {
241		return nil, compiler.Errors
242	}
243
244	store := inmem.NewFromObject(b.Data)
245
246	rego := rego.New(
247		rego.Query(query),
248		rego.Compiler(compiler),
249		rego.Store(store),
250		rego.Runtime(info),
251	)
252
253	rs, err := rego.Eval(ctx)
254	if err != nil {
255		return nil, err
256	}
257
258	if len(rs) == 0 {
259		return nil, fmt.Errorf("undefined configuration")
260	}
261
262	bs, err := json.Marshal(rs[0].Expressions[0].Value)
263	if err != nil {
264		return nil, err
265	}
266
267	return config.ParseConfig(bs, id)
268}
269
270type pluginSet struct {
271	Start    []plugins.Plugin
272	Reconfig []pluginreconfig
273}
274
275type pluginreconfig struct {
276	Config interface{}
277	Plugin plugins.Plugin
278}
279
280type pluginfactory struct {
281	name    string
282	factory plugins.Factory
283	config  interface{}
284}
285
286func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager, config *config.Config, m metrics.Metrics) (*pluginSet, error) {
287
288	// Parse and validate plugin configurations.
289	pluginNames := []string{}
290	pluginFactories := []pluginfactory{}
291
292	for k := range config.Plugins {
293		f, ok := factories[k]
294		if !ok {
295			return nil, fmt.Errorf("plugin %q not registered", k)
296		}
297
298		c, err := f.Validate(manager, config.Plugins[k])
299		if err != nil {
300			return nil, err
301		}
302
303		pluginFactories = append(pluginFactories, pluginfactory{
304			name:    k,
305			factory: f,
306			config:  c,
307		})
308
309		pluginNames = append(pluginNames, k)
310	}
311
312	// Parse and validate bundle/logs/status configurations.
313
314	// If `bundle` was configured use that, otherwise try the new `bundles` option
315	bundleConfig, err := bundle.ParseConfig(config.Bundle, manager.Services())
316	if err != nil {
317		return nil, err
318	}
319	if bundleConfig == nil {
320		bundleConfig, err = bundle.ParseBundlesConfig(config.Bundles, manager.Services())
321		if err != nil {
322			return nil, err
323		}
324	}
325
326	decisionLogsConfig, err := logs.ParseConfig(config.DecisionLogs, manager.Services(), pluginNames)
327	if err != nil {
328		return nil, err
329	}
330
331	statusConfig, err := status.ParseConfig(config.Status, manager.Services())
332	if err != nil {
333		return nil, err
334	}
335
336	// Accumulate plugins to start or reconfigure.
337	starts := []plugins.Plugin{}
338	reconfigs := []pluginreconfig{}
339
340	if bundleConfig != nil {
341		p, created := getBundlePlugin(manager, bundleConfig)
342		if created {
343			starts = append(starts, p)
344		} else if p != nil {
345			reconfigs = append(reconfigs, pluginreconfig{bundleConfig, p})
346		}
347	}
348
349	if decisionLogsConfig != nil {
350		p, created := getDecisionLogsPlugin(manager, decisionLogsConfig)
351		if created {
352			starts = append(starts, p)
353		} else if p != nil {
354			reconfigs = append(reconfigs, pluginreconfig{decisionLogsConfig, p})
355		}
356	}
357
358	if statusConfig != nil {
359		p, created := getStatusPlugin(manager, statusConfig, m)
360		if created {
361			starts = append(starts, p)
362		} else if p != nil {
363			reconfigs = append(reconfigs, pluginreconfig{statusConfig, p})
364		}
365	}
366
367	result := &pluginSet{starts, reconfigs}
368
369	getCustomPlugins(manager, pluginFactories, result)
370
371	return result, nil
372}
373
374func getBundlePlugin(m *plugins.Manager, config *bundle.Config) (plugin *bundle.Plugin, created bool) {
375	plugin = bundle.Lookup(m)
376	if plugin == nil {
377		plugin = bundle.New(config, m)
378		m.Register(bundle.Name, plugin)
379		registerBundleStatusUpdates(m)
380		created = true
381	}
382	return plugin, created
383}
384
385func getDecisionLogsPlugin(m *plugins.Manager, config *logs.Config) (plugin *logs.Plugin, created bool) {
386	plugin = logs.Lookup(m)
387	if plugin == nil {
388		plugin = logs.New(config, m)
389		m.Register(logs.Name, plugin)
390		created = true
391	}
392	return plugin, created
393}
394
395func getStatusPlugin(m *plugins.Manager, config *status.Config, metrics metrics.Metrics) (plugin *status.Plugin, created bool) {
396
397	plugin = status.Lookup(m)
398
399	if plugin == nil {
400		plugin = status.New(config, m).WithMetrics(metrics)
401		m.Register(status.Name, plugin)
402		registerBundleStatusUpdates(m)
403		created = true
404	}
405
406	return plugin, created
407}
408
409func getCustomPlugins(manager *plugins.Manager, factories []pluginfactory, result *pluginSet) {
410	for _, pf := range factories {
411		if plugin := manager.Plugin(pf.name); plugin != nil {
412			result.Reconfig = append(result.Reconfig, pluginreconfig{pf.config, plugin})
413		} else {
414			plugin := pf.factory.New(manager, pf.config)
415			manager.Register(pf.name, plugin)
416			result.Start = append(result.Start, plugin)
417		}
418	}
419}
420
421func registerBundleStatusUpdates(m *plugins.Manager) {
422	bp := bundle.Lookup(m)
423	sp := status.Lookup(m)
424	if bp == nil || sp == nil {
425		return
426	}
427	type pluginlistener string
428
429	// Depending on how the plugin was configured we will want to use different listeners
430	// for backwards compatibility.
431	if !bp.Config().IsMultiBundle() {
432		bp.Register(pluginlistener(status.Name), sp.UpdateBundleStatus)
433	} else {
434		bp.RegisterBulkListener(pluginlistener(status.Name), sp.BulkUpdateBundleStatus)
435	}
436}
437