1package manager 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "io/ioutil" 10 "net/http" 11 "net/url" 12 "os" 13 "path/filepath" 14 "sync" 15 "time" 16 17 "github.com/grafana/grafana-plugin-sdk-go/backend" 18 19 "github.com/grafana/grafana/pkg/infra/fs" 20 "github.com/grafana/grafana/pkg/infra/log" 21 "github.com/grafana/grafana/pkg/models" 22 "github.com/grafana/grafana/pkg/plugins" 23 "github.com/grafana/grafana/pkg/plugins/backendplugin" 24 "github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation" 25 "github.com/grafana/grafana/pkg/plugins/manager/installer" 26 "github.com/grafana/grafana/pkg/services/sqlstore" 27 "github.com/grafana/grafana/pkg/setting" 28 "github.com/grafana/grafana/pkg/util/errutil" 29 "github.com/grafana/grafana/pkg/util/proxyutil" 30) 31 32const ( 33 grafanaComURL = "https://grafana.com/api/plugins" 34) 35 36var _ plugins.Client = (*PluginManager)(nil) 37var _ plugins.Store = (*PluginManager)(nil) 38var _ plugins.PluginDashboardManager = (*PluginManager)(nil) 39var _ plugins.StaticRouteResolver = (*PluginManager)(nil) 40var _ plugins.CoreBackendRegistrar = (*PluginManager)(nil) 41var _ plugins.RendererManager = (*PluginManager)(nil) 42 43type PluginManager struct { 44 cfg *setting.Cfg 45 requestValidator models.PluginRequestValidator 46 sqlStore *sqlstore.SQLStore 47 store map[string]*plugins.Plugin 48 pluginInstaller plugins.Installer 49 pluginLoader plugins.Loader 50 pluginsMu sync.RWMutex 51 log log.Logger 52} 53 54func ProvideService(cfg *setting.Cfg, requestValidator models.PluginRequestValidator, pluginLoader plugins.Loader, 55 sqlStore *sqlstore.SQLStore) (*PluginManager, error) { 56 pm := newManager(cfg, requestValidator, pluginLoader, sqlStore) 57 if err := pm.init(); err != nil { 58 return nil, err 59 } 60 return pm, nil 61} 62 63func newManager(cfg *setting.Cfg, pluginRequestValidator models.PluginRequestValidator, pluginLoader plugins.Loader, 64 sqlStore *sqlstore.SQLStore) *PluginManager { 65 return &PluginManager{ 66 cfg: cfg, 67 requestValidator: pluginRequestValidator, 68 sqlStore: sqlStore, 69 pluginLoader: pluginLoader, 70 store: map[string]*plugins.Plugin{}, 71 log: log.New("plugin.manager"), 72 pluginInstaller: installer.New(false, cfg.BuildVersion, newInstallerLogger("plugin.installer", true)), 73 } 74} 75 76func (m *PluginManager) init() error { 77 // create external plugin's path if not exists 78 exists, err := fs.Exists(m.cfg.PluginsPath) 79 if err != nil { 80 return err 81 } 82 83 if !exists { 84 if err = os.MkdirAll(m.cfg.PluginsPath, os.ModePerm); err != nil { 85 m.log.Error("Failed to create external plugins directory", "dir", m.cfg.PluginsPath, "error", err) 86 } else { 87 m.log.Debug("External plugins directory created", "dir", m.cfg.PluginsPath) 88 } 89 } 90 91 m.log.Info("Initialising plugins") 92 93 // install Core plugins 94 err = m.loadPlugins(m.corePluginPaths()...) 95 if err != nil { 96 return err 97 } 98 99 // install Bundled plugins 100 err = m.loadPlugins(m.cfg.BundledPluginsPath) 101 if err != nil { 102 return err 103 } 104 105 // install External plugins 106 err = m.loadPlugins(m.cfg.PluginsPath) 107 if err != nil { 108 return err 109 } 110 111 // install plugins from cfg.PluginSettings 112 err = m.loadPlugins(m.pluginSettingPaths()...) 113 if err != nil { 114 return err 115 } 116 117 return nil 118} 119 120func (m *PluginManager) Run(ctx context.Context) error { 121 <-ctx.Done() 122 m.shutdown(ctx) 123 return ctx.Err() 124} 125 126func (m *PluginManager) plugin(pluginID string) (*plugins.Plugin, bool) { 127 m.pluginsMu.RLock() 128 defer m.pluginsMu.RUnlock() 129 p, exists := m.store[pluginID] 130 131 if !exists || (p.IsDecommissioned()) { 132 return nil, false 133 } 134 135 return p, true 136} 137 138func (m *PluginManager) plugins() []*plugins.Plugin { 139 m.pluginsMu.RLock() 140 defer m.pluginsMu.RUnlock() 141 142 res := make([]*plugins.Plugin, 0) 143 for _, p := range m.store { 144 if !p.IsDecommissioned() { 145 res = append(res, p) 146 } 147 } 148 149 return res 150} 151 152func (m *PluginManager) loadPlugins(paths ...string) error { 153 if len(paths) == 0 { 154 return nil 155 } 156 157 var pluginPaths []string 158 for _, p := range paths { 159 if p != "" { 160 pluginPaths = append(pluginPaths, p) 161 } 162 } 163 164 loadedPlugins, err := m.pluginLoader.Load(pluginPaths, m.registeredPlugins()) 165 if err != nil { 166 m.log.Error("Could not load plugins", "paths", pluginPaths, "err", err) 167 return err 168 } 169 170 for _, p := range loadedPlugins { 171 if err := m.registerAndStart(context.Background(), p); err != nil { 172 m.log.Error("Could not start plugin", "pluginId", p.ID, "err", err) 173 } 174 } 175 176 return nil 177} 178 179func (m *PluginManager) registeredPlugins() map[string]struct{} { 180 pluginsByID := make(map[string]struct{}) 181 for _, p := range m.plugins() { 182 pluginsByID[p.ID] = struct{}{} 183 } 184 185 return pluginsByID 186} 187 188func (m *PluginManager) Renderer() *plugins.Plugin { 189 for _, p := range m.plugins() { 190 if p.IsRenderer() { 191 return p 192 } 193 } 194 195 return nil 196} 197 198func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { 199 plugin, exists := m.plugin(req.PluginContext.PluginID) 200 if !exists { 201 return nil, backendplugin.ErrPluginNotRegistered 202 } 203 204 var resp *backend.QueryDataResponse 205 err := instrumentation.InstrumentQueryDataRequest(req.PluginContext.PluginID, func() (innerErr error) { 206 resp, innerErr = plugin.QueryData(ctx, req) 207 return 208 }) 209 210 if err != nil { 211 if errors.Is(err, backendplugin.ErrMethodNotImplemented) { 212 return nil, err 213 } 214 215 if errors.Is(err, backendplugin.ErrPluginUnavailable) { 216 return nil, err 217 } 218 219 return nil, errutil.Wrap("failed to query data", err) 220 } 221 222 for refID, res := range resp.Responses { 223 // set frame ref ID based on response ref ID 224 for _, f := range res.Frames { 225 if f.RefID == "" { 226 f.RefID = refID 227 } 228 } 229 } 230 231 return resp, err 232} 233 234func (m *PluginManager) CallResource(pCtx backend.PluginContext, reqCtx *models.ReqContext, path string) { 235 var dsURL string 236 if pCtx.DataSourceInstanceSettings != nil { 237 dsURL = pCtx.DataSourceInstanceSettings.URL 238 } 239 240 err := m.requestValidator.Validate(dsURL, reqCtx.Req) 241 if err != nil { 242 reqCtx.JsonApiErr(http.StatusForbidden, "Access denied", err) 243 return 244 } 245 246 clonedReq := reqCtx.Req.Clone(reqCtx.Req.Context()) 247 rawURL := path 248 if clonedReq.URL.RawQuery != "" { 249 rawURL += "?" + clonedReq.URL.RawQuery 250 } 251 urlPath, err := url.Parse(rawURL) 252 if err != nil { 253 handleCallResourceError(err, reqCtx) 254 return 255 } 256 clonedReq.URL = urlPath 257 err = m.callResourceInternal(reqCtx.Resp, clonedReq, pCtx) 258 if err != nil { 259 handleCallResourceError(err, reqCtx) 260 } 261} 262 263func (m *PluginManager) callResourceInternal(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error { 264 p, exists := m.plugin(pCtx.PluginID) 265 if !exists { 266 return backendplugin.ErrPluginNotRegistered 267 } 268 269 keepCookieModel := keepCookiesJSONModel{} 270 if dis := pCtx.DataSourceInstanceSettings; dis != nil { 271 err := json.Unmarshal(dis.JSONData, &keepCookieModel) 272 if err != nil { 273 p.Logger().Error("Failed to to unpack JSONData in datasource instance settings", "err", err) 274 } 275 } 276 277 proxyutil.ClearCookieHeader(req, keepCookieModel.KeepCookies) 278 proxyutil.PrepareProxyRequest(req) 279 280 body, err := ioutil.ReadAll(req.Body) 281 if err != nil { 282 return fmt.Errorf("failed to read request body: %w", err) 283 } 284 285 crReq := &backend.CallResourceRequest{ 286 PluginContext: pCtx, 287 Path: req.URL.Path, 288 Method: req.Method, 289 URL: req.URL.String(), 290 Headers: req.Header, 291 Body: body, 292 } 293 294 return instrumentation.InstrumentCallResourceRequest(p.PluginID(), func() error { 295 childCtx, cancel := context.WithCancel(req.Context()) 296 defer cancel() 297 stream := newCallResourceResponseStream(childCtx) 298 299 var wg sync.WaitGroup 300 wg.Add(1) 301 302 defer func() { 303 if err := stream.Close(); err != nil { 304 m.log.Warn("Failed to close stream", "err", err) 305 } 306 wg.Wait() 307 }() 308 309 var flushStreamErr error 310 go func() { 311 flushStreamErr = flushStream(p, stream, w) 312 wg.Done() 313 }() 314 315 if err := p.CallResource(req.Context(), crReq, stream); err != nil { 316 return err 317 } 318 319 return flushStreamErr 320 }) 321} 322 323func handleCallResourceError(err error, reqCtx *models.ReqContext) { 324 if errors.Is(err, backendplugin.ErrPluginUnavailable) { 325 reqCtx.JsonApiErr(503, "Plugin unavailable", err) 326 return 327 } 328 329 if errors.Is(err, backendplugin.ErrMethodNotImplemented) { 330 reqCtx.JsonApiErr(404, "Not found", err) 331 return 332 } 333 334 reqCtx.JsonApiErr(500, "Failed to call resource", err) 335} 336 337func flushStream(plugin backendplugin.Plugin, stream callResourceClientResponseStream, w http.ResponseWriter) error { 338 processedStreams := 0 339 340 for { 341 resp, err := stream.Recv() 342 if errors.Is(err, io.EOF) { 343 if processedStreams == 0 { 344 return errors.New("received empty resource response") 345 } 346 return nil 347 } 348 if err != nil { 349 if processedStreams == 0 { 350 return errutil.Wrap("failed to receive response from resource call", err) 351 } 352 353 plugin.Logger().Error("Failed to receive response from resource call", "err", err) 354 return stream.Close() 355 } 356 357 // Expected that headers and status are only part of first stream 358 if processedStreams == 0 && resp.Headers != nil { 359 // Make sure a content type always is returned in response 360 if _, exists := resp.Headers["Content-Type"]; !exists { 361 resp.Headers["Content-Type"] = []string{"application/json"} 362 } 363 364 for k, values := range resp.Headers { 365 // Due to security reasons we don't want to forward 366 // cookies from a backend plugin to clients/browsers. 367 if k == "Set-Cookie" { 368 continue 369 } 370 371 for _, v := range values { 372 // TODO: Figure out if we should use Set here instead 373 // nolint:gocritic 374 w.Header().Add(k, v) 375 } 376 } 377 378 proxyutil.SetProxyResponseHeaders(w.Header()) 379 w.WriteHeader(resp.Status) 380 } 381 382 if _, err := w.Write(resp.Body); err != nil { 383 plugin.Logger().Error("Failed to write resource response", "err", err) 384 } 385 386 if flusher, ok := w.(http.Flusher); ok { 387 flusher.Flush() 388 } 389 processedStreams++ 390 } 391} 392 393func (m *PluginManager) CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) { 394 p, exists := m.plugin(pluginID) 395 if !exists { 396 return nil, backendplugin.ErrPluginNotRegistered 397 } 398 399 var resp *backend.CollectMetricsResult 400 err := instrumentation.InstrumentCollectMetrics(p.PluginID(), func() (innerErr error) { 401 resp, innerErr = p.CollectMetrics(ctx) 402 return 403 }) 404 if err != nil { 405 return nil, err 406 } 407 408 return resp, nil 409} 410 411func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { 412 var dsURL string 413 if req.PluginContext.DataSourceInstanceSettings != nil { 414 dsURL = req.PluginContext.DataSourceInstanceSettings.URL 415 } 416 417 err := m.requestValidator.Validate(dsURL, nil) 418 if err != nil { 419 return &backend.CheckHealthResult{ 420 Status: http.StatusForbidden, 421 Message: "Access denied", 422 }, nil 423 } 424 425 p, exists := m.plugin(req.PluginContext.PluginID) 426 if !exists { 427 return nil, backendplugin.ErrPluginNotRegistered 428 } 429 430 var resp *backend.CheckHealthResult 431 err = instrumentation.InstrumentCheckHealthRequest(p.PluginID(), func() (innerErr error) { 432 resp, innerErr = p.CheckHealth(ctx, &backend.CheckHealthRequest{PluginContext: req.PluginContext}) 433 return 434 }) 435 436 if err != nil { 437 if errors.Is(err, backendplugin.ErrMethodNotImplemented) { 438 return nil, err 439 } 440 441 if errors.Is(err, backendplugin.ErrPluginUnavailable) { 442 return nil, err 443 } 444 445 return nil, errutil.Wrap("failed to check plugin health", backendplugin.ErrHealthCheckFailed) 446 } 447 448 return resp, nil 449} 450 451func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { 452 plugin, exists := m.plugin(req.PluginContext.PluginID) 453 if !exists { 454 return nil, backendplugin.ErrPluginNotRegistered 455 } 456 457 return plugin.SubscribeStream(ctx, req) 458} 459 460func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { 461 plugin, exists := m.plugin(req.PluginContext.PluginID) 462 if !exists { 463 return nil, backendplugin.ErrPluginNotRegistered 464 } 465 466 return plugin.PublishStream(ctx, req) 467} 468 469func (m *PluginManager) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { 470 plugin, exists := m.plugin(req.PluginContext.PluginID) 471 if !exists { 472 return backendplugin.ErrPluginNotRegistered 473 } 474 475 return plugin.RunStream(ctx, req, sender) 476} 477 478func (m *PluginManager) isRegistered(pluginID string) bool { 479 p, exists := m.plugin(pluginID) 480 if !exists { 481 return false 482 } 483 484 return !p.IsDecommissioned() 485} 486 487func (m *PluginManager) LoadAndRegister(pluginID string, factory backendplugin.PluginFactoryFunc) error { 488 if m.isRegistered(pluginID) { 489 return fmt.Errorf("backend plugin %s already registered", pluginID) 490 } 491 492 pluginRootDir := pluginID 493 if pluginID == "stackdriver" { 494 pluginRootDir = "cloud-monitoring" 495 } 496 497 path := filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource", pluginRootDir) 498 499 p, err := m.pluginLoader.LoadWithFactory(path, factory) 500 if err != nil { 501 return err 502 } 503 504 err = m.register(p) 505 if err != nil { 506 return err 507 } 508 509 return nil 510} 511 512func (m *PluginManager) Routes() []*plugins.StaticRoute { 513 staticRoutes := make([]*plugins.StaticRoute, 0) 514 515 for _, p := range m.plugins() { 516 if p.StaticRoute() != nil { 517 staticRoutes = append(staticRoutes, p.StaticRoute()) 518 } 519 } 520 return staticRoutes 521} 522 523func (m *PluginManager) registerAndStart(ctx context.Context, plugin *plugins.Plugin) error { 524 err := m.register(plugin) 525 if err != nil { 526 return err 527 } 528 529 if !m.isRegistered(plugin.ID) { 530 return fmt.Errorf("plugin %s is not registered", plugin.ID) 531 } 532 533 return m.start(ctx, plugin) 534} 535 536func (m *PluginManager) register(p *plugins.Plugin) error { 537 if m.isRegistered(p.ID) { 538 return fmt.Errorf("plugin %s is already registered", p.ID) 539 } 540 541 m.pluginsMu.Lock() 542 m.store[p.ID] = p 543 m.pluginsMu.Unlock() 544 545 if !p.IsCorePlugin() { 546 m.log.Info("Plugin registered", "pluginId", p.ID) 547 } 548 549 return nil 550} 551 552func (m *PluginManager) unregisterAndStop(ctx context.Context, p *plugins.Plugin) error { 553 m.log.Debug("Stopping plugin process", "pluginId", p.ID) 554 m.pluginsMu.Lock() 555 defer m.pluginsMu.Unlock() 556 557 if err := p.Decommission(); err != nil { 558 return err 559 } 560 561 if err := p.Stop(ctx); err != nil { 562 return err 563 } 564 565 delete(m.store, p.ID) 566 567 m.log.Debug("Plugin unregistered", "pluginId", p.ID) 568 return nil 569} 570 571// start starts a backend plugin process 572func (m *PluginManager) start(ctx context.Context, p *plugins.Plugin) error { 573 if !p.IsManaged() || !p.Backend || p.SignatureError != nil { 574 return nil 575 } 576 577 if !m.isRegistered(p.ID) { 578 return backendplugin.ErrPluginNotRegistered 579 } 580 581 if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil { 582 return err 583 } 584 585 if !p.IsCorePlugin() { 586 p.Logger().Debug("Successfully started backend plugin process") 587 } 588 589 return nil 590} 591 592func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin) error { 593 if err := p.Start(ctx); err != nil { 594 return err 595 } 596 597 go func(ctx context.Context, p *plugins.Plugin) { 598 if err := restartKilledProcess(ctx, p); err != nil { 599 p.Logger().Error("Attempt to restart killed plugin process failed", "error", err) 600 } 601 }(ctx, p) 602 603 return nil 604} 605 606func restartKilledProcess(ctx context.Context, p *plugins.Plugin) error { 607 ticker := time.NewTicker(time.Second * 1) 608 609 for { 610 select { 611 case <-ctx.Done(): 612 if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { 613 return err 614 } 615 return nil 616 case <-ticker.C: 617 if p.IsDecommissioned() { 618 p.Logger().Debug("Plugin decommissioned") 619 return nil 620 } 621 622 if !p.Exited() { 623 continue 624 } 625 626 p.Logger().Debug("Restarting plugin") 627 if err := p.Start(ctx); err != nil { 628 p.Logger().Error("Failed to restart plugin", "error", err) 629 continue 630 } 631 p.Logger().Debug("Plugin restarted") 632 } 633 } 634} 635 636// shutdown stops all backend plugin processes 637func (m *PluginManager) shutdown(ctx context.Context) { 638 var wg sync.WaitGroup 639 for _, p := range m.plugins() { 640 wg.Add(1) 641 go func(p backendplugin.Plugin, ctx context.Context) { 642 defer wg.Done() 643 p.Logger().Debug("Stopping plugin") 644 if err := p.Stop(ctx); err != nil { 645 p.Logger().Error("Failed to stop plugin", "error", err) 646 } 647 p.Logger().Debug("Plugin stopped") 648 }(p, ctx) 649 } 650 wg.Wait() 651} 652 653// corePluginPaths provides a list of the Core plugin paths which need to be scanned on init() 654func (m *PluginManager) corePluginPaths() []string { 655 datasourcePaths := []string{ 656 filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/alertmanager"), 657 filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/dashboard"), 658 filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/jaeger"), 659 filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/mixed"), 660 filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/zipkin"), 661 } 662 663 panelsPath := filepath.Join(m.cfg.StaticRootPath, "app/plugins/panel") 664 665 return append(datasourcePaths, panelsPath) 666} 667 668// pluginSettingPaths provides a plugin paths defined in cfg.PluginSettings which need to be scanned on init() 669func (m *PluginManager) pluginSettingPaths() []string { 670 var pluginSettingDirs []string 671 for _, settings := range m.cfg.PluginSettings { 672 path, exists := settings["path"] 673 if !exists || path == "" { 674 continue 675 } 676 pluginSettingDirs = append(pluginSettingDirs, path) 677 } 678 679 return pluginSettingDirs 680} 681 682// callResourceClientResponseStream is used for receiving resource call responses. 683type callResourceClientResponseStream interface { 684 Recv() (*backend.CallResourceResponse, error) 685 Close() error 686} 687 688type keepCookiesJSONModel struct { 689 KeepCookies []string `json:"keepCookies"` 690} 691 692type callResourceResponseStream struct { 693 ctx context.Context 694 stream chan *backend.CallResourceResponse 695 closed bool 696} 697 698func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream { 699 return &callResourceResponseStream{ 700 ctx: ctx, 701 stream: make(chan *backend.CallResourceResponse), 702 } 703} 704 705func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error { 706 if s.closed { 707 return errors.New("cannot send to a closed stream") 708 } 709 710 select { 711 case <-s.ctx.Done(): 712 return errors.New("cancelled") 713 case s.stream <- res: 714 return nil 715 } 716} 717 718func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) { 719 select { 720 case <-s.ctx.Done(): 721 return nil, s.ctx.Err() 722 case res, ok := <-s.stream: 723 if !ok { 724 return nil, io.EOF 725 } 726 return res, nil 727 } 728} 729 730func (s *callResourceResponseStream) Close() error { 731 if s.closed { 732 return errors.New("cannot close a closed stream") 733 } 734 735 close(s.stream) 736 s.closed = true 737 return nil 738} 739