1package runstream
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"math"
8	"sync"
9	"time"
10
11	"github.com/grafana/grafana/pkg/infra/log"
12	"github.com/grafana/grafana/pkg/models"
13
14	"github.com/grafana/grafana-plugin-sdk-go/backend"
15)
16
17var (
18	logger = log.New("live.runstream")
19)
20
21//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream ChannelLocalPublisher,NumLocalSubscribersGetter,StreamRunner,PluginContextGetter
22
23type ChannelLocalPublisher interface {
24	PublishLocal(channel string, data []byte) error
25}
26
27type PluginContextGetter interface {
28	GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error)
29}
30
31type NumLocalSubscribersGetter interface {
32	// GetNumSubscribers returns number of channel subscribers throughout all nodes.
33	GetNumLocalSubscribers(channel string) (int, error)
34}
35
36type StreamRunner interface {
37	RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error
38}
39
40type packetSender struct {
41	channelLocalPublisher ChannelLocalPublisher
42	channel               string
43}
44
45func (p *packetSender) Send(packet *backend.StreamPacket) error {
46	return p.channelLocalPublisher.PublishLocal(p.channel, packet.Data)
47}
48
49// Manager manages streams from Grafana to plugins (i.e. RunStream method).
50type Manager struct {
51	mu                      sync.RWMutex
52	baseCtx                 context.Context
53	streams                 map[string]streamContext
54	datasourceStreams       map[string]map[string]struct{}
55	presenceGetter          NumLocalSubscribersGetter
56	pluginContextGetter     PluginContextGetter
57	channelSender           ChannelLocalPublisher
58	registerCh              chan submitRequest
59	closedCh                chan struct{}
60	checkInterval           time.Duration
61	maxChecks               int
62	datasourceCheckInterval time.Duration
63}
64
65// ManagerOption modifies Manager behavior (used for tests for example).
66type ManagerOption func(*Manager)
67
68// WithCheckConfig allows setting custom check rules.
69func WithCheckConfig(interval time.Duration, maxChecks int) ManagerOption {
70	return func(sm *Manager) {
71		sm.checkInterval = interval
72		sm.maxChecks = maxChecks
73	}
74}
75
76const (
77	defaultCheckInterval           = 5 * time.Second
78	defaultDatasourceCheckInterval = 60 * time.Second
79	defaultMaxChecks               = 3
80)
81
82// NewManager creates new Manager.
83func NewManager(channelSender ChannelLocalPublisher, presenceGetter NumLocalSubscribersGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager {
84	sm := &Manager{
85		streams:                 make(map[string]streamContext),
86		datasourceStreams:       map[string]map[string]struct{}{},
87		channelSender:           channelSender,
88		presenceGetter:          presenceGetter,
89		pluginContextGetter:     pluginContextGetter,
90		registerCh:              make(chan submitRequest),
91		closedCh:                make(chan struct{}),
92		checkInterval:           defaultCheckInterval,
93		maxChecks:               defaultMaxChecks,
94		datasourceCheckInterval: defaultDatasourceCheckInterval,
95	}
96	for _, opt := range opts {
97		opt(sm)
98	}
99	return sm
100}
101
102func (s *Manager) HandleDatasourceDelete(orgID int64, dsUID string) error {
103	return s.handleDatasourceEvent(orgID, dsUID, false)
104}
105
106func (s *Manager) HandleDatasourceUpdate(orgID int64, dsUID string) error {
107	return s.handleDatasourceEvent(orgID, dsUID, true)
108}
109
110func (s *Manager) handleDatasourceEvent(orgID int64, dsUID string, resubmit bool) error {
111	dsKey := datasourceKey(orgID, dsUID)
112	s.mu.RLock()
113	dsStreams, ok := s.datasourceStreams[dsKey]
114	if !ok {
115		s.mu.RUnlock()
116		return nil
117	}
118	var resubmitRequests []streamRequest
119	var waitChannels []chan struct{}
120	for channel := range dsStreams {
121		streamCtx, ok := s.streams[channel]
122		if !ok {
123			continue
124		}
125		streamCtx.cancelFn()
126		waitChannels = append(waitChannels, streamCtx.CloseCh)
127		resubmitRequests = append(resubmitRequests, streamCtx.streamRequest)
128	}
129	s.mu.RUnlock()
130
131	// Wait for all streams to stop.
132	for _, ch := range waitChannels {
133		<-ch
134	}
135
136	if resubmit {
137		// Re-submit streams.
138		for _, sr := range resubmitRequests {
139			_, err := s.SubmitStream(s.baseCtx, sr.user, sr.Channel, sr.Path, sr.PluginContext, sr.StreamRunner, true)
140			if err != nil {
141				// Log error but do not prevent execution of caller routine.
142				logger.Error("Error re-submitting stream", "path", sr.Path, "error", err)
143			}
144		}
145	}
146
147	return nil
148}
149
150func datasourceKey(orgID int64, dsUID string) string {
151	return fmt.Sprintf("%d_%s", orgID, dsUID)
152}
153
154func (s *Manager) stopStream(sr streamRequest, cancelFn func()) {
155	s.mu.Lock()
156	defer s.mu.Unlock()
157	streamCtx, ok := s.streams[sr.Channel]
158	if !ok {
159		return
160	}
161	closeCh := streamCtx.CloseCh
162	delete(s.streams, sr.Channel)
163	if sr.PluginContext.DataSourceInstanceSettings != nil {
164		dsUID := sr.PluginContext.DataSourceInstanceSettings.UID
165		dsKey := datasourceKey(sr.PluginContext.OrgID, dsUID)
166		delete(s.datasourceStreams[dsKey], sr.Channel)
167	}
168	cancelFn()
169	close(closeCh)
170}
171
172func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) {
173	numNoSubscribersChecks := 0
174	presenceTicker := time.NewTicker(s.checkInterval)
175	defer presenceTicker.Stop()
176	datasourceTicker := time.NewTicker(s.datasourceCheckInterval)
177	defer datasourceTicker.Stop()
178	for {
179		select {
180		case <-ctx.Done():
181			return
182		case <-datasourceTicker.C:
183			if sr.PluginContext.DataSourceInstanceSettings != nil {
184				dsUID := sr.PluginContext.DataSourceInstanceSettings.UID
185				pCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, sr.PluginContext.PluginID, dsUID, false)
186				if err != nil {
187					logger.Error("Error getting datasource context", "channel", sr.Channel, "path", sr.Path, "error", err)
188					continue
189				}
190				if !ok {
191					logger.Debug("Datasource not found, stop stream", "channel", sr.Channel, "path", sr.Path)
192					return
193				}
194				if pCtx.DataSourceInstanceSettings.Updated != sr.PluginContext.DataSourceInstanceSettings.Updated {
195					logger.Debug("Datasource changed, re-establish stream", "channel", sr.Channel, "path", sr.Path)
196					err := s.HandleDatasourceUpdate(pCtx.OrgID, dsUID)
197					if err != nil {
198						logger.Error("Error re-establishing stream", "channel", sr.Channel, "path", sr.Path, "error", err)
199						continue
200					}
201					return
202				}
203			}
204		case <-presenceTicker.C:
205			numSubscribers, err := s.presenceGetter.GetNumLocalSubscribers(sr.Channel)
206			if err != nil {
207				logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path, "error", err)
208				continue
209			}
210			if numSubscribers > 0 {
211				// reset counter since channel has active subscribers.
212				numNoSubscribersChecks = 0
213				continue
214			}
215			numNoSubscribersChecks++
216			if numNoSubscribersChecks >= s.maxChecks {
217				logger.Debug("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path)
218				s.stopStream(sr, cancelFn)
219				return
220			}
221		}
222	}
223}
224
225const streamDurationThreshold = 100 * time.Millisecond
226const coolDownDelay = 100 * time.Millisecond
227const maxDelay = 5 * time.Second
228
229func getDelay(numErrors int) time.Duration {
230	if numErrors == 0 {
231		return 0
232	}
233	delay := coolDownDelay * time.Duration(math.Pow(2, float64(numErrors)))
234	if delay > maxDelay {
235		return maxDelay
236	}
237	return delay
238}
239
240// run stream until context canceled or stream finished without an error.
241func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamRequest) {
242	defer func() { s.stopStream(sr, cancelFn) }()
243	var numFastErrors int
244	var delay time.Duration
245	var isReconnect bool
246	startTime := time.Now()
247	for {
248		select {
249		case <-ctx.Done():
250			return
251		default:
252		}
253
254		pluginCtx := sr.PluginContext
255
256		if isReconnect {
257			// Best effort to cool down re-establishment process. We don't have a
258			// nice way to understand whether we really need to wait here - so relying
259			// on duration time of running a stream.
260			if time.Since(startTime) < streamDurationThreshold {
261				if delay < maxDelay {
262					// Due to not calling getDelay after we have delay larger than maxDelay
263					// we avoid possible float overflow errors while calculating delay duration
264					// based on numFastErrors.
265					delay = getDelay(numFastErrors)
266				}
267				numFastErrors++
268			} else {
269				// Assuming that stream successfully started.
270				delay = 0
271				numFastErrors = 0
272			}
273			select {
274			case <-ctx.Done():
275				return
276			case <-time.After(delay):
277			}
278			startTime = time.Now()
279
280			// Resolve new plugin context as it could be modified since last call.
281			// We are using the same user here which initiated stream originally.
282			var datasourceUID string
283			if pluginCtx.DataSourceInstanceSettings != nil {
284				datasourceUID = pluginCtx.DataSourceInstanceSettings.UID
285			}
286			newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, pluginCtx.PluginID, datasourceUID, false)
287			if err != nil {
288				logger.Error("Error getting plugin context", "path", sr.Path, "error", err)
289				isReconnect = true
290				continue
291			}
292			if !ok {
293				logger.Info("No plugin context found, stopping stream", "path", sr.Path)
294				return
295			}
296			pluginCtx = newPluginCtx
297		}
298
299		err := sr.StreamRunner.RunStream(
300			ctx,
301			&backend.RunStreamRequest{
302				PluginContext: pluginCtx,
303				Path:          sr.Path,
304			},
305			backend.NewStreamSender(&packetSender{channelLocalPublisher: s.channelSender, channel: sr.Channel}),
306		)
307		if err != nil {
308			if errors.Is(ctx.Err(), context.Canceled) {
309				logger.Debug("Stream cleanly finished", "path", sr.Path)
310				return
311			}
312			logger.Error("Error running stream, re-establishing", "path", sr.Path, "error", err, "wait", delay)
313			isReconnect = true
314			continue
315		}
316		logger.Debug("Stream finished without error, stopping it", "path", sr.Path)
317		return
318	}
319}
320
321var errClosed = errors.New("stream manager closed")
322
323type streamContext struct {
324	CloseCh       chan struct{}
325	cancelFn      func()
326	streamRequest streamRequest
327}
328
329func (s *Manager) registerStream(ctx context.Context, sr submitRequest) {
330	s.mu.Lock()
331	if streamCtx, ok := s.streams[sr.streamRequest.Channel]; ok {
332		s.mu.Unlock()
333		sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true, CloseNotify: streamCtx.CloseCh}}
334		return
335	}
336	ctx, cancel := context.WithCancel(ctx)
337	defer cancel()
338	closeCh := make(chan struct{})
339	s.streams[sr.streamRequest.Channel] = streamContext{
340		CloseCh:       closeCh,
341		cancelFn:      cancel,
342		streamRequest: sr.streamRequest,
343	}
344	if sr.streamRequest.PluginContext.DataSourceInstanceSettings != nil {
345		dsUID := sr.streamRequest.PluginContext.DataSourceInstanceSettings.UID
346		dsKey := datasourceKey(sr.streamRequest.PluginContext.OrgID, dsUID)
347		if _, ok := s.datasourceStreams[dsKey]; !ok {
348			s.datasourceStreams[dsKey] = map[string]struct{}{}
349		}
350		s.datasourceStreams[dsKey][sr.streamRequest.Channel] = struct{}{}
351	}
352	s.mu.Unlock()
353	sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false, CloseNotify: closeCh}}
354	go s.watchStream(ctx, cancel, sr.streamRequest)
355	s.runStream(ctx, cancel, sr.streamRequest)
356}
357
358// Run Manager till context canceled.
359func (s *Manager) Run(ctx context.Context) error {
360	s.baseCtx = ctx
361	for {
362		select {
363		case sr := <-s.registerCh:
364			go s.registerStream(ctx, sr)
365		case <-ctx.Done():
366			close(s.closedCh)
367			return ctx.Err()
368		}
369	}
370}
371
372type streamRequest struct {
373	Channel       string
374	Path          string
375	user          *models.SignedInUser
376	PluginContext backend.PluginContext
377	StreamRunner  StreamRunner
378}
379
380type submitRequest struct {
381	responseCh    chan submitResponse
382	streamRequest streamRequest
383}
384
385type submitResult struct {
386	// StreamExists tells whether stream have been already opened.
387	StreamExists bool
388	// CloseNotify will be closed as soon as stream cleanly exited.
389	CloseNotify chan struct{}
390}
391
392type submitResponse struct {
393	Error  error
394	Result submitResult
395}
396
397var errDatasourceNotFound = errors.New("datasource not found")
398
399// SubmitStream submits stream handler in Manager to manage.
400// The stream will be opened and kept till channel has active subscribers.
401func (s *Manager) SubmitStream(ctx context.Context, user *models.SignedInUser, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner, isResubmit bool) (*submitResult, error) {
402	if isResubmit {
403		// Resolve new plugin context as it could be modified since last call.
404		var datasourceUID string
405		if pCtx.DataSourceInstanceSettings != nil {
406			datasourceUID = pCtx.DataSourceInstanceSettings.UID
407		}
408		newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(user, pCtx.PluginID, datasourceUID, false)
409		if err != nil {
410			return nil, err
411		}
412		if !ok {
413			return nil, errDatasourceNotFound
414		}
415		pCtx = newPluginCtx
416	}
417
418	req := submitRequest{
419		responseCh: make(chan submitResponse, 1),
420		streamRequest: streamRequest{
421			user:          user,
422			Channel:       channel,
423			Path:          path,
424			PluginContext: pCtx,
425			StreamRunner:  streamRunner,
426		},
427	}
428
429	// Send submit request.
430	select {
431	case s.registerCh <- req:
432	case <-s.closedCh:
433		close(s.registerCh)
434		return nil, errClosed
435	case <-ctx.Done():
436		return nil, ctx.Err()
437	}
438
439	// Wait for submit response.
440	select {
441	case resp := <-req.responseCh:
442		if resp.Error != nil {
443			return nil, resp.Error
444		}
445		return &resp.Result, nil
446	case <-s.closedCh:
447		return nil, errClosed
448	case <-ctx.Done():
449		return nil, ctx.Err()
450	}
451}
452