1// Copyright 2018 The OPA Authors. All rights reserved. 2// Use of this source code is governed by an Apache2 3// license that can be found in the LICENSE file. 4 5// Package discovery implements configuration discovery. 6package discovery 7 8import ( 9 "context" 10 "encoding/json" 11 "fmt" 12 "sync" 13 14 "github.com/open-policy-agent/opa/metrics" 15 16 "github.com/sirupsen/logrus" 17 18 "github.com/open-policy-agent/opa/ast" 19 bundleApi "github.com/open-policy-agent/opa/bundle" 20 "github.com/open-policy-agent/opa/config" 21 "github.com/open-policy-agent/opa/download" 22 "github.com/open-policy-agent/opa/plugins" 23 "github.com/open-policy-agent/opa/plugins/bundle" 24 "github.com/open-policy-agent/opa/plugins/logs" 25 "github.com/open-policy-agent/opa/plugins/status" 26 "github.com/open-policy-agent/opa/rego" 27 "github.com/open-policy-agent/opa/storage/inmem" 28) 29 30// Name is the discovery plugin name that will be registered with the plugin manager. 31const Name = "discovery" 32 33// Discovery implements configuration discovery for OPA. When discovery is 34// started it will periodically download a configuration bundle and try to 35// reconfigure the OPA. 36type Discovery struct { 37 manager *plugins.Manager 38 config *Config 39 factories map[string]plugins.Factory 40 downloader *download.Downloader // discovery bundle downloader 41 status *bundle.Status // discovery status 42 etag string // discovery bundle etag for caching purposes 43 metrics metrics.Metrics 44 readyOnce sync.Once 45} 46 47// Factories provides a set of factory functions to use for 48// instantiating custom plugins. 49func Factories(fs map[string]plugins.Factory) func(*Discovery) { 50 return func(d *Discovery) { 51 d.factories = fs 52 } 53} 54 55// Metrics provides a metrics provider to pass to plugins. 56func Metrics(m metrics.Metrics) func(*Discovery) { 57 return func(d *Discovery) { 58 d.metrics = m 59 } 60} 61 62// New returns a new discovery plugin. 63func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error) { 64 65 result := &Discovery{ 66 manager: manager, 67 } 68 69 for _, f := range opts { 70 f(result) 71 } 72 73 config, err := ParseConfig(manager.Config.Discovery, manager.Services()) 74 75 if err != nil { 76 return nil, err 77 } else if config == nil { 78 if _, err := getPluginSet(result.factories, manager, manager.Config, result.metrics); err != nil { 79 return nil, err 80 } 81 return result, nil 82 } 83 84 if manager.Config.PluginsEnabled() { 85 return nil, fmt.Errorf("plugins cannot be specified in the bootstrap configuration when discovery enabled") 86 } 87 88 result.config = config 89 result.downloader = download.New(config.Config, manager.Client(config.service), config.path).WithCallback(result.oneShot) 90 result.status = &bundle.Status{ 91 Name: *config.Name, 92 } 93 94 manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) 95 return result, nil 96} 97 98// Start starts the dynamic discovery process if configured. 99func (c *Discovery) Start(ctx context.Context) error { 100 if c.downloader != nil { 101 c.downloader.Start(ctx) 102 } else { 103 // If there is no dynamic discovery then update the status to OK. 104 c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) 105 } 106 return nil 107} 108 109// Stop stops the dynamic discovery process if configured. 110func (c *Discovery) Stop(ctx context.Context) { 111 if c.downloader != nil { 112 c.downloader.Stop(ctx) 113 } 114 115 c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) 116} 117 118// Reconfigure is a no-op on discovery. 119func (c *Discovery) Reconfigure(_ context.Context, _ interface{}) { 120 121} 122 123func (c *Discovery) oneShot(ctx context.Context, u download.Update) { 124 125 c.processUpdate(ctx, u) 126 127 if p := status.Lookup(c.manager); p != nil { 128 p.UpdateDiscoveryStatus(*c.status) 129 } 130} 131 132func (c *Discovery) processUpdate(ctx context.Context, u download.Update) { 133 134 if u.Error != nil { 135 c.logError("Discovery download failed: %v", u.Error) 136 c.status.SetError(u.Error) 137 c.downloader.ClearCache() 138 return 139 } 140 141 if u.Bundle != nil { 142 c.status.SetDownloadSuccess() 143 144 if err := c.reconfigure(ctx, u); err != nil { 145 c.logError("Discovery reconfiguration error occurred: %v", err) 146 c.status.SetError(err) 147 c.downloader.ClearCache() 148 return 149 } 150 151 c.status.SetError(nil) 152 c.status.SetActivateSuccess(u.Bundle.Manifest.Revision) 153 154 // On the first activation success mark the plugin as being in OK state 155 c.readyOnce.Do(func() { 156 c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) 157 }) 158 159 if u.ETag != "" { 160 c.logInfo("Discovery update processed successfully. Etag updated to %v.", u.ETag) 161 } else { 162 c.logInfo("Discovery update processed successfully.") 163 } 164 c.etag = u.ETag 165 return 166 } 167 168 if u.ETag == c.etag { 169 c.logDebug("Discovery update skipped, server replied with not modified.") 170 c.status.SetError(nil) 171 return 172 } 173} 174 175func (c *Discovery) reconfigure(ctx context.Context, u download.Update) error { 176 177 config, ps, err := processBundle(ctx, c.manager, c.factories, u.Bundle, c.config.query, c.metrics) 178 if err != nil { 179 return err 180 } 181 182 if err := c.manager.Reconfigure(config); err != nil { 183 return err 184 } 185 186 // TODO(tsandall): we don't currently support changes to discovery 187 // configuration. These changes are risky because errors would be 188 // unrecoverable (without keeping track of changes and rolling back...) 189 190 // TODO(tsandall): add protection against discovery -service- changing. 191 for _, p := range ps.Start { 192 if err := p.Start(ctx); err != nil { 193 return err 194 } 195 } 196 197 for _, p := range ps.Reconfig { 198 p.Plugin.Reconfigure(ctx, p.Config) 199 } 200 201 return nil 202} 203 204func (c *Discovery) logError(fmt string, a ...interface{}) { 205 logrus.WithFields(c.logrusFields()).Errorf(fmt, a...) 206} 207 208func (c *Discovery) logInfo(fmt string, a ...interface{}) { 209 logrus.WithFields(c.logrusFields()).Infof(fmt, a...) 210} 211 212func (c *Discovery) logDebug(fmt string, a ...interface{}) { 213 logrus.WithFields(c.logrusFields()).Debugf(fmt, a...) 214} 215 216func (c *Discovery) logrusFields() logrus.Fields { 217 return logrus.Fields{ 218 "name": *c.config.Name, 219 "plugin": "discovery", 220 } 221} 222 223func processBundle(ctx context.Context, manager *plugins.Manager, factories map[string]plugins.Factory, b *bundleApi.Bundle, query string, m metrics.Metrics) (*config.Config, *pluginSet, error) { 224 225 config, err := evaluateBundle(ctx, manager.ID, manager.Info, b, query) 226 if err != nil { 227 return nil, nil, err 228 } 229 230 ps, err := getPluginSet(factories, manager, config, m) 231 return config, ps, err 232} 233 234func evaluateBundle(ctx context.Context, id string, info *ast.Term, b *bundleApi.Bundle, query string) (*config.Config, error) { 235 236 modules := b.ParsedModules("discovery") 237 238 compiler := ast.NewCompiler() 239 240 if compiler.Compile(modules); compiler.Failed() { 241 return nil, compiler.Errors 242 } 243 244 store := inmem.NewFromObject(b.Data) 245 246 rego := rego.New( 247 rego.Query(query), 248 rego.Compiler(compiler), 249 rego.Store(store), 250 rego.Runtime(info), 251 ) 252 253 rs, err := rego.Eval(ctx) 254 if err != nil { 255 return nil, err 256 } 257 258 if len(rs) == 0 { 259 return nil, fmt.Errorf("undefined configuration") 260 } 261 262 bs, err := json.Marshal(rs[0].Expressions[0].Value) 263 if err != nil { 264 return nil, err 265 } 266 267 return config.ParseConfig(bs, id) 268} 269 270type pluginSet struct { 271 Start []plugins.Plugin 272 Reconfig []pluginreconfig 273} 274 275type pluginreconfig struct { 276 Config interface{} 277 Plugin plugins.Plugin 278} 279 280type pluginfactory struct { 281 name string 282 factory plugins.Factory 283 config interface{} 284} 285 286func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager, config *config.Config, m metrics.Metrics) (*pluginSet, error) { 287 288 // Parse and validate plugin configurations. 289 pluginNames := []string{} 290 pluginFactories := []pluginfactory{} 291 292 for k := range config.Plugins { 293 f, ok := factories[k] 294 if !ok { 295 return nil, fmt.Errorf("plugin %q not registered", k) 296 } 297 298 c, err := f.Validate(manager, config.Plugins[k]) 299 if err != nil { 300 return nil, err 301 } 302 303 pluginFactories = append(pluginFactories, pluginfactory{ 304 name: k, 305 factory: f, 306 config: c, 307 }) 308 309 pluginNames = append(pluginNames, k) 310 } 311 312 // Parse and validate bundle/logs/status configurations. 313 314 // If `bundle` was configured use that, otherwise try the new `bundles` option 315 bundleConfig, err := bundle.ParseConfig(config.Bundle, manager.Services()) 316 if err != nil { 317 return nil, err 318 } 319 if bundleConfig == nil { 320 bundleConfig, err = bundle.ParseBundlesConfig(config.Bundles, manager.Services()) 321 if err != nil { 322 return nil, err 323 } 324 } 325 326 decisionLogsConfig, err := logs.ParseConfig(config.DecisionLogs, manager.Services(), pluginNames) 327 if err != nil { 328 return nil, err 329 } 330 331 statusConfig, err := status.ParseConfig(config.Status, manager.Services()) 332 if err != nil { 333 return nil, err 334 } 335 336 // Accumulate plugins to start or reconfigure. 337 starts := []plugins.Plugin{} 338 reconfigs := []pluginreconfig{} 339 340 if bundleConfig != nil { 341 p, created := getBundlePlugin(manager, bundleConfig) 342 if created { 343 starts = append(starts, p) 344 } else if p != nil { 345 reconfigs = append(reconfigs, pluginreconfig{bundleConfig, p}) 346 } 347 } 348 349 if decisionLogsConfig != nil { 350 p, created := getDecisionLogsPlugin(manager, decisionLogsConfig) 351 if created { 352 starts = append(starts, p) 353 } else if p != nil { 354 reconfigs = append(reconfigs, pluginreconfig{decisionLogsConfig, p}) 355 } 356 } 357 358 if statusConfig != nil { 359 p, created := getStatusPlugin(manager, statusConfig, m) 360 if created { 361 starts = append(starts, p) 362 } else if p != nil { 363 reconfigs = append(reconfigs, pluginreconfig{statusConfig, p}) 364 } 365 } 366 367 result := &pluginSet{starts, reconfigs} 368 369 getCustomPlugins(manager, pluginFactories, result) 370 371 return result, nil 372} 373 374func getBundlePlugin(m *plugins.Manager, config *bundle.Config) (plugin *bundle.Plugin, created bool) { 375 plugin = bundle.Lookup(m) 376 if plugin == nil { 377 plugin = bundle.New(config, m) 378 m.Register(bundle.Name, plugin) 379 registerBundleStatusUpdates(m) 380 created = true 381 } 382 return plugin, created 383} 384 385func getDecisionLogsPlugin(m *plugins.Manager, config *logs.Config) (plugin *logs.Plugin, created bool) { 386 plugin = logs.Lookup(m) 387 if plugin == nil { 388 plugin = logs.New(config, m) 389 m.Register(logs.Name, plugin) 390 created = true 391 } 392 return plugin, created 393} 394 395func getStatusPlugin(m *plugins.Manager, config *status.Config, metrics metrics.Metrics) (plugin *status.Plugin, created bool) { 396 397 plugin = status.Lookup(m) 398 399 if plugin == nil { 400 plugin = status.New(config, m).WithMetrics(metrics) 401 m.Register(status.Name, plugin) 402 registerBundleStatusUpdates(m) 403 created = true 404 } 405 406 return plugin, created 407} 408 409func getCustomPlugins(manager *plugins.Manager, factories []pluginfactory, result *pluginSet) { 410 for _, pf := range factories { 411 if plugin := manager.Plugin(pf.name); plugin != nil { 412 result.Reconfig = append(result.Reconfig, pluginreconfig{pf.config, plugin}) 413 } else { 414 plugin := pf.factory.New(manager, pf.config) 415 manager.Register(pf.name, plugin) 416 result.Start = append(result.Start, plugin) 417 } 418 } 419} 420 421func registerBundleStatusUpdates(m *plugins.Manager) { 422 bp := bundle.Lookup(m) 423 sp := status.Lookup(m) 424 if bp == nil || sp == nil { 425 return 426 } 427 type pluginlistener string 428 429 // Depending on how the plugin was configured we will want to use different listeners 430 // for backwards compatibility. 431 if !bp.Config().IsMultiBundle() { 432 bp.Register(pluginlistener(status.Name), sp.UpdateBundleStatus) 433 } else { 434 bp.RegisterBulkListener(pluginlistener(status.Name), sp.BulkUpdateBundleStatus) 435 } 436} 437