1package manager
2
3import (
4	"context"
5	"encoding/json"
6	"errors"
7	"fmt"
8	"io"
9	"io/ioutil"
10	"net/http"
11	"net/url"
12	"os"
13	"path/filepath"
14	"sync"
15	"time"
16
17	"github.com/grafana/grafana-plugin-sdk-go/backend"
18
19	"github.com/grafana/grafana/pkg/infra/fs"
20	"github.com/grafana/grafana/pkg/infra/log"
21	"github.com/grafana/grafana/pkg/models"
22	"github.com/grafana/grafana/pkg/plugins"
23	"github.com/grafana/grafana/pkg/plugins/backendplugin"
24	"github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation"
25	"github.com/grafana/grafana/pkg/plugins/manager/installer"
26	"github.com/grafana/grafana/pkg/services/sqlstore"
27	"github.com/grafana/grafana/pkg/setting"
28	"github.com/grafana/grafana/pkg/util/errutil"
29	"github.com/grafana/grafana/pkg/util/proxyutil"
30)
31
32const (
33	grafanaComURL = "https://grafana.com/api/plugins"
34)
35
36var _ plugins.Client = (*PluginManager)(nil)
37var _ plugins.Store = (*PluginManager)(nil)
38var _ plugins.PluginDashboardManager = (*PluginManager)(nil)
39var _ plugins.StaticRouteResolver = (*PluginManager)(nil)
40var _ plugins.CoreBackendRegistrar = (*PluginManager)(nil)
41var _ plugins.RendererManager = (*PluginManager)(nil)
42
43type PluginManager struct {
44	cfg              *setting.Cfg
45	requestValidator models.PluginRequestValidator
46	sqlStore         *sqlstore.SQLStore
47	store            map[string]*plugins.Plugin
48	pluginInstaller  plugins.Installer
49	pluginLoader     plugins.Loader
50	pluginsMu        sync.RWMutex
51	log              log.Logger
52}
53
54func ProvideService(cfg *setting.Cfg, requestValidator models.PluginRequestValidator, pluginLoader plugins.Loader,
55	sqlStore *sqlstore.SQLStore) (*PluginManager, error) {
56	pm := newManager(cfg, requestValidator, pluginLoader, sqlStore)
57	if err := pm.init(); err != nil {
58		return nil, err
59	}
60	return pm, nil
61}
62
63func newManager(cfg *setting.Cfg, pluginRequestValidator models.PluginRequestValidator, pluginLoader plugins.Loader,
64	sqlStore *sqlstore.SQLStore) *PluginManager {
65	return &PluginManager{
66		cfg:              cfg,
67		requestValidator: pluginRequestValidator,
68		sqlStore:         sqlStore,
69		pluginLoader:     pluginLoader,
70		store:            map[string]*plugins.Plugin{},
71		log:              log.New("plugin.manager"),
72		pluginInstaller:  installer.New(false, cfg.BuildVersion, newInstallerLogger("plugin.installer", true)),
73	}
74}
75
76func (m *PluginManager) init() error {
77	// create external plugin's path if not exists
78	exists, err := fs.Exists(m.cfg.PluginsPath)
79	if err != nil {
80		return err
81	}
82
83	if !exists {
84		if err = os.MkdirAll(m.cfg.PluginsPath, os.ModePerm); err != nil {
85			m.log.Error("Failed to create external plugins directory", "dir", m.cfg.PluginsPath, "error", err)
86		} else {
87			m.log.Debug("External plugins directory created", "dir", m.cfg.PluginsPath)
88		}
89	}
90
91	m.log.Info("Initialising plugins")
92
93	// install Core plugins
94	err = m.loadPlugins(m.corePluginPaths()...)
95	if err != nil {
96		return err
97	}
98
99	// install Bundled plugins
100	err = m.loadPlugins(m.cfg.BundledPluginsPath)
101	if err != nil {
102		return err
103	}
104
105	// install External plugins
106	err = m.loadPlugins(m.cfg.PluginsPath)
107	if err != nil {
108		return err
109	}
110
111	// install plugins from cfg.PluginSettings
112	err = m.loadPlugins(m.pluginSettingPaths()...)
113	if err != nil {
114		return err
115	}
116
117	return nil
118}
119
120func (m *PluginManager) Run(ctx context.Context) error {
121	<-ctx.Done()
122	m.shutdown(ctx)
123	return ctx.Err()
124}
125
126func (m *PluginManager) plugin(pluginID string) (*plugins.Plugin, bool) {
127	m.pluginsMu.RLock()
128	defer m.pluginsMu.RUnlock()
129	p, exists := m.store[pluginID]
130
131	if !exists || (p.IsDecommissioned()) {
132		return nil, false
133	}
134
135	return p, true
136}
137
138func (m *PluginManager) plugins() []*plugins.Plugin {
139	m.pluginsMu.RLock()
140	defer m.pluginsMu.RUnlock()
141
142	res := make([]*plugins.Plugin, 0)
143	for _, p := range m.store {
144		if !p.IsDecommissioned() {
145			res = append(res, p)
146		}
147	}
148
149	return res
150}
151
152func (m *PluginManager) loadPlugins(paths ...string) error {
153	if len(paths) == 0 {
154		return nil
155	}
156
157	var pluginPaths []string
158	for _, p := range paths {
159		if p != "" {
160			pluginPaths = append(pluginPaths, p)
161		}
162	}
163
164	loadedPlugins, err := m.pluginLoader.Load(pluginPaths, m.registeredPlugins())
165	if err != nil {
166		m.log.Error("Could not load plugins", "paths", pluginPaths, "err", err)
167		return err
168	}
169
170	for _, p := range loadedPlugins {
171		if err := m.registerAndStart(context.Background(), p); err != nil {
172			m.log.Error("Could not start plugin", "pluginId", p.ID, "err", err)
173		}
174	}
175
176	return nil
177}
178
179func (m *PluginManager) registeredPlugins() map[string]struct{} {
180	pluginsByID := make(map[string]struct{})
181	for _, p := range m.plugins() {
182		pluginsByID[p.ID] = struct{}{}
183	}
184
185	return pluginsByID
186}
187
188func (m *PluginManager) Renderer() *plugins.Plugin {
189	for _, p := range m.plugins() {
190		if p.IsRenderer() {
191			return p
192		}
193	}
194
195	return nil
196}
197
198func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
199	plugin, exists := m.plugin(req.PluginContext.PluginID)
200	if !exists {
201		return nil, backendplugin.ErrPluginNotRegistered
202	}
203
204	var resp *backend.QueryDataResponse
205	err := instrumentation.InstrumentQueryDataRequest(req.PluginContext.PluginID, func() (innerErr error) {
206		resp, innerErr = plugin.QueryData(ctx, req)
207		return
208	})
209
210	if err != nil {
211		if errors.Is(err, backendplugin.ErrMethodNotImplemented) {
212			return nil, err
213		}
214
215		if errors.Is(err, backendplugin.ErrPluginUnavailable) {
216			return nil, err
217		}
218
219		return nil, errutil.Wrap("failed to query data", err)
220	}
221
222	for refID, res := range resp.Responses {
223		// set frame ref ID based on response ref ID
224		for _, f := range res.Frames {
225			if f.RefID == "" {
226				f.RefID = refID
227			}
228		}
229	}
230
231	return resp, err
232}
233
234func (m *PluginManager) CallResource(pCtx backend.PluginContext, reqCtx *models.ReqContext, path string) {
235	var dsURL string
236	if pCtx.DataSourceInstanceSettings != nil {
237		dsURL = pCtx.DataSourceInstanceSettings.URL
238	}
239
240	err := m.requestValidator.Validate(dsURL, reqCtx.Req)
241	if err != nil {
242		reqCtx.JsonApiErr(http.StatusForbidden, "Access denied", err)
243		return
244	}
245
246	clonedReq := reqCtx.Req.Clone(reqCtx.Req.Context())
247	rawURL := path
248	if clonedReq.URL.RawQuery != "" {
249		rawURL += "?" + clonedReq.URL.RawQuery
250	}
251	urlPath, err := url.Parse(rawURL)
252	if err != nil {
253		handleCallResourceError(err, reqCtx)
254		return
255	}
256	clonedReq.URL = urlPath
257	err = m.callResourceInternal(reqCtx.Resp, clonedReq, pCtx)
258	if err != nil {
259		handleCallResourceError(err, reqCtx)
260	}
261}
262
263func (m *PluginManager) callResourceInternal(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error {
264	p, exists := m.plugin(pCtx.PluginID)
265	if !exists {
266		return backendplugin.ErrPluginNotRegistered
267	}
268
269	keepCookieModel := keepCookiesJSONModel{}
270	if dis := pCtx.DataSourceInstanceSettings; dis != nil {
271		err := json.Unmarshal(dis.JSONData, &keepCookieModel)
272		if err != nil {
273			p.Logger().Error("Failed to to unpack JSONData in datasource instance settings", "err", err)
274		}
275	}
276
277	proxyutil.ClearCookieHeader(req, keepCookieModel.KeepCookies)
278	proxyutil.PrepareProxyRequest(req)
279
280	body, err := ioutil.ReadAll(req.Body)
281	if err != nil {
282		return fmt.Errorf("failed to read request body: %w", err)
283	}
284
285	crReq := &backend.CallResourceRequest{
286		PluginContext: pCtx,
287		Path:          req.URL.Path,
288		Method:        req.Method,
289		URL:           req.URL.String(),
290		Headers:       req.Header,
291		Body:          body,
292	}
293
294	return instrumentation.InstrumentCallResourceRequest(p.PluginID(), func() error {
295		childCtx, cancel := context.WithCancel(req.Context())
296		defer cancel()
297		stream := newCallResourceResponseStream(childCtx)
298
299		var wg sync.WaitGroup
300		wg.Add(1)
301
302		defer func() {
303			if err := stream.Close(); err != nil {
304				m.log.Warn("Failed to close stream", "err", err)
305			}
306			wg.Wait()
307		}()
308
309		var flushStreamErr error
310		go func() {
311			flushStreamErr = flushStream(p, stream, w)
312			wg.Done()
313		}()
314
315		if err := p.CallResource(req.Context(), crReq, stream); err != nil {
316			return err
317		}
318
319		return flushStreamErr
320	})
321}
322
323func handleCallResourceError(err error, reqCtx *models.ReqContext) {
324	if errors.Is(err, backendplugin.ErrPluginUnavailable) {
325		reqCtx.JsonApiErr(503, "Plugin unavailable", err)
326		return
327	}
328
329	if errors.Is(err, backendplugin.ErrMethodNotImplemented) {
330		reqCtx.JsonApiErr(404, "Not found", err)
331		return
332	}
333
334	reqCtx.JsonApiErr(500, "Failed to call resource", err)
335}
336
337func flushStream(plugin backendplugin.Plugin, stream callResourceClientResponseStream, w http.ResponseWriter) error {
338	processedStreams := 0
339
340	for {
341		resp, err := stream.Recv()
342		if errors.Is(err, io.EOF) {
343			if processedStreams == 0 {
344				return errors.New("received empty resource response")
345			}
346			return nil
347		}
348		if err != nil {
349			if processedStreams == 0 {
350				return errutil.Wrap("failed to receive response from resource call", err)
351			}
352
353			plugin.Logger().Error("Failed to receive response from resource call", "err", err)
354			return stream.Close()
355		}
356
357		// Expected that headers and status are only part of first stream
358		if processedStreams == 0 && resp.Headers != nil {
359			// Make sure a content type always is returned in response
360			if _, exists := resp.Headers["Content-Type"]; !exists {
361				resp.Headers["Content-Type"] = []string{"application/json"}
362			}
363
364			for k, values := range resp.Headers {
365				// Due to security reasons we don't want to forward
366				// cookies from a backend plugin to clients/browsers.
367				if k == "Set-Cookie" {
368					continue
369				}
370
371				for _, v := range values {
372					// TODO: Figure out if we should use Set here instead
373					// nolint:gocritic
374					w.Header().Add(k, v)
375				}
376			}
377
378			proxyutil.SetProxyResponseHeaders(w.Header())
379			w.WriteHeader(resp.Status)
380		}
381
382		if _, err := w.Write(resp.Body); err != nil {
383			plugin.Logger().Error("Failed to write resource response", "err", err)
384		}
385
386		if flusher, ok := w.(http.Flusher); ok {
387			flusher.Flush()
388		}
389		processedStreams++
390	}
391}
392
393func (m *PluginManager) CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) {
394	p, exists := m.plugin(pluginID)
395	if !exists {
396		return nil, backendplugin.ErrPluginNotRegistered
397	}
398
399	var resp *backend.CollectMetricsResult
400	err := instrumentation.InstrumentCollectMetrics(p.PluginID(), func() (innerErr error) {
401		resp, innerErr = p.CollectMetrics(ctx)
402		return
403	})
404	if err != nil {
405		return nil, err
406	}
407
408	return resp, nil
409}
410
411func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
412	var dsURL string
413	if req.PluginContext.DataSourceInstanceSettings != nil {
414		dsURL = req.PluginContext.DataSourceInstanceSettings.URL
415	}
416
417	err := m.requestValidator.Validate(dsURL, nil)
418	if err != nil {
419		return &backend.CheckHealthResult{
420			Status:  http.StatusForbidden,
421			Message: "Access denied",
422		}, nil
423	}
424
425	p, exists := m.plugin(req.PluginContext.PluginID)
426	if !exists {
427		return nil, backendplugin.ErrPluginNotRegistered
428	}
429
430	var resp *backend.CheckHealthResult
431	err = instrumentation.InstrumentCheckHealthRequest(p.PluginID(), func() (innerErr error) {
432		resp, innerErr = p.CheckHealth(ctx, &backend.CheckHealthRequest{PluginContext: req.PluginContext})
433		return
434	})
435
436	if err != nil {
437		if errors.Is(err, backendplugin.ErrMethodNotImplemented) {
438			return nil, err
439		}
440
441		if errors.Is(err, backendplugin.ErrPluginUnavailable) {
442			return nil, err
443		}
444
445		return nil, errutil.Wrap("failed to check plugin health", backendplugin.ErrHealthCheckFailed)
446	}
447
448	return resp, nil
449}
450
451func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
452	plugin, exists := m.plugin(req.PluginContext.PluginID)
453	if !exists {
454		return nil, backendplugin.ErrPluginNotRegistered
455	}
456
457	return plugin.SubscribeStream(ctx, req)
458}
459
460func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
461	plugin, exists := m.plugin(req.PluginContext.PluginID)
462	if !exists {
463		return nil, backendplugin.ErrPluginNotRegistered
464	}
465
466	return plugin.PublishStream(ctx, req)
467}
468
469func (m *PluginManager) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
470	plugin, exists := m.plugin(req.PluginContext.PluginID)
471	if !exists {
472		return backendplugin.ErrPluginNotRegistered
473	}
474
475	return plugin.RunStream(ctx, req, sender)
476}
477
478func (m *PluginManager) isRegistered(pluginID string) bool {
479	p, exists := m.plugin(pluginID)
480	if !exists {
481		return false
482	}
483
484	return !p.IsDecommissioned()
485}
486
487func (m *PluginManager) LoadAndRegister(pluginID string, factory backendplugin.PluginFactoryFunc) error {
488	if m.isRegistered(pluginID) {
489		return fmt.Errorf("backend plugin %s already registered", pluginID)
490	}
491
492	pluginRootDir := pluginID
493	if pluginID == "stackdriver" {
494		pluginRootDir = "cloud-monitoring"
495	}
496
497	path := filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource", pluginRootDir)
498
499	p, err := m.pluginLoader.LoadWithFactory(path, factory)
500	if err != nil {
501		return err
502	}
503
504	err = m.register(p)
505	if err != nil {
506		return err
507	}
508
509	return nil
510}
511
512func (m *PluginManager) Routes() []*plugins.StaticRoute {
513	staticRoutes := make([]*plugins.StaticRoute, 0)
514
515	for _, p := range m.plugins() {
516		if p.StaticRoute() != nil {
517			staticRoutes = append(staticRoutes, p.StaticRoute())
518		}
519	}
520	return staticRoutes
521}
522
523func (m *PluginManager) registerAndStart(ctx context.Context, plugin *plugins.Plugin) error {
524	err := m.register(plugin)
525	if err != nil {
526		return err
527	}
528
529	if !m.isRegistered(plugin.ID) {
530		return fmt.Errorf("plugin %s is not registered", plugin.ID)
531	}
532
533	return m.start(ctx, plugin)
534}
535
536func (m *PluginManager) register(p *plugins.Plugin) error {
537	if m.isRegistered(p.ID) {
538		return fmt.Errorf("plugin %s is already registered", p.ID)
539	}
540
541	m.pluginsMu.Lock()
542	m.store[p.ID] = p
543	m.pluginsMu.Unlock()
544
545	if !p.IsCorePlugin() {
546		m.log.Info("Plugin registered", "pluginId", p.ID)
547	}
548
549	return nil
550}
551
552func (m *PluginManager) unregisterAndStop(ctx context.Context, p *plugins.Plugin) error {
553	m.log.Debug("Stopping plugin process", "pluginId", p.ID)
554	m.pluginsMu.Lock()
555	defer m.pluginsMu.Unlock()
556
557	if err := p.Decommission(); err != nil {
558		return err
559	}
560
561	if err := p.Stop(ctx); err != nil {
562		return err
563	}
564
565	delete(m.store, p.ID)
566
567	m.log.Debug("Plugin unregistered", "pluginId", p.ID)
568	return nil
569}
570
571// start starts a backend plugin process
572func (m *PluginManager) start(ctx context.Context, p *plugins.Plugin) error {
573	if !p.IsManaged() || !p.Backend || p.SignatureError != nil {
574		return nil
575	}
576
577	if !m.isRegistered(p.ID) {
578		return backendplugin.ErrPluginNotRegistered
579	}
580
581	if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil {
582		return err
583	}
584
585	if !p.IsCorePlugin() {
586		p.Logger().Debug("Successfully started backend plugin process")
587	}
588
589	return nil
590}
591
592func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin) error {
593	if err := p.Start(ctx); err != nil {
594		return err
595	}
596
597	go func(ctx context.Context, p *plugins.Plugin) {
598		if err := restartKilledProcess(ctx, p); err != nil {
599			p.Logger().Error("Attempt to restart killed plugin process failed", "error", err)
600		}
601	}(ctx, p)
602
603	return nil
604}
605
606func restartKilledProcess(ctx context.Context, p *plugins.Plugin) error {
607	ticker := time.NewTicker(time.Second * 1)
608
609	for {
610		select {
611		case <-ctx.Done():
612			if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
613				return err
614			}
615			return nil
616		case <-ticker.C:
617			if p.IsDecommissioned() {
618				p.Logger().Debug("Plugin decommissioned")
619				return nil
620			}
621
622			if !p.Exited() {
623				continue
624			}
625
626			p.Logger().Debug("Restarting plugin")
627			if err := p.Start(ctx); err != nil {
628				p.Logger().Error("Failed to restart plugin", "error", err)
629				continue
630			}
631			p.Logger().Debug("Plugin restarted")
632		}
633	}
634}
635
636// shutdown stops all backend plugin processes
637func (m *PluginManager) shutdown(ctx context.Context) {
638	var wg sync.WaitGroup
639	for _, p := range m.plugins() {
640		wg.Add(1)
641		go func(p backendplugin.Plugin, ctx context.Context) {
642			defer wg.Done()
643			p.Logger().Debug("Stopping plugin")
644			if err := p.Stop(ctx); err != nil {
645				p.Logger().Error("Failed to stop plugin", "error", err)
646			}
647			p.Logger().Debug("Plugin stopped")
648		}(p, ctx)
649	}
650	wg.Wait()
651}
652
653// corePluginPaths provides a list of the Core plugin paths which need to be scanned on init()
654func (m *PluginManager) corePluginPaths() []string {
655	datasourcePaths := []string{
656		filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/alertmanager"),
657		filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/dashboard"),
658		filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/jaeger"),
659		filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/mixed"),
660		filepath.Join(m.cfg.StaticRootPath, "app/plugins/datasource/zipkin"),
661	}
662
663	panelsPath := filepath.Join(m.cfg.StaticRootPath, "app/plugins/panel")
664
665	return append(datasourcePaths, panelsPath)
666}
667
668// pluginSettingPaths provides a plugin paths defined in cfg.PluginSettings which need to be scanned on init()
669func (m *PluginManager) pluginSettingPaths() []string {
670	var pluginSettingDirs []string
671	for _, settings := range m.cfg.PluginSettings {
672		path, exists := settings["path"]
673		if !exists || path == "" {
674			continue
675		}
676		pluginSettingDirs = append(pluginSettingDirs, path)
677	}
678
679	return pluginSettingDirs
680}
681
682// callResourceClientResponseStream is used for receiving resource call responses.
683type callResourceClientResponseStream interface {
684	Recv() (*backend.CallResourceResponse, error)
685	Close() error
686}
687
688type keepCookiesJSONModel struct {
689	KeepCookies []string `json:"keepCookies"`
690}
691
692type callResourceResponseStream struct {
693	ctx    context.Context
694	stream chan *backend.CallResourceResponse
695	closed bool
696}
697
698func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream {
699	return &callResourceResponseStream{
700		ctx:    ctx,
701		stream: make(chan *backend.CallResourceResponse),
702	}
703}
704
705func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error {
706	if s.closed {
707		return errors.New("cannot send to a closed stream")
708	}
709
710	select {
711	case <-s.ctx.Done():
712		return errors.New("cancelled")
713	case s.stream <- res:
714		return nil
715	}
716}
717
718func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) {
719	select {
720	case <-s.ctx.Done():
721		return nil, s.ctx.Err()
722	case res, ok := <-s.stream:
723		if !ok {
724			return nil, io.EOF
725		}
726		return res, nil
727	}
728}
729
730func (s *callResourceResponseStream) Close() error {
731	if s.closed {
732		return errors.New("cannot close a closed stream")
733	}
734
735	close(s.stream)
736	s.closed = true
737	return nil
738}
739