1package runstream 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "math" 8 "sync" 9 "time" 10 11 "github.com/grafana/grafana/pkg/infra/log" 12 "github.com/grafana/grafana/pkg/models" 13 14 "github.com/grafana/grafana-plugin-sdk-go/backend" 15) 16 17var ( 18 logger = log.New("live.runstream") 19) 20 21//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream ChannelLocalPublisher,NumLocalSubscribersGetter,StreamRunner,PluginContextGetter 22 23type ChannelLocalPublisher interface { 24 PublishLocal(channel string, data []byte) error 25} 26 27type PluginContextGetter interface { 28 GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) 29} 30 31type NumLocalSubscribersGetter interface { 32 // GetNumSubscribers returns number of channel subscribers throughout all nodes. 33 GetNumLocalSubscribers(channel string) (int, error) 34} 35 36type StreamRunner interface { 37 RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error 38} 39 40type packetSender struct { 41 channelLocalPublisher ChannelLocalPublisher 42 channel string 43} 44 45func (p *packetSender) Send(packet *backend.StreamPacket) error { 46 return p.channelLocalPublisher.PublishLocal(p.channel, packet.Data) 47} 48 49// Manager manages streams from Grafana to plugins (i.e. RunStream method). 50type Manager struct { 51 mu sync.RWMutex 52 baseCtx context.Context 53 streams map[string]streamContext 54 datasourceStreams map[string]map[string]struct{} 55 presenceGetter NumLocalSubscribersGetter 56 pluginContextGetter PluginContextGetter 57 channelSender ChannelLocalPublisher 58 registerCh chan submitRequest 59 closedCh chan struct{} 60 checkInterval time.Duration 61 maxChecks int 62 datasourceCheckInterval time.Duration 63} 64 65// ManagerOption modifies Manager behavior (used for tests for example). 66type ManagerOption func(*Manager) 67 68// WithCheckConfig allows setting custom check rules. 69func WithCheckConfig(interval time.Duration, maxChecks int) ManagerOption { 70 return func(sm *Manager) { 71 sm.checkInterval = interval 72 sm.maxChecks = maxChecks 73 } 74} 75 76const ( 77 defaultCheckInterval = 5 * time.Second 78 defaultDatasourceCheckInterval = 60 * time.Second 79 defaultMaxChecks = 3 80) 81 82// NewManager creates new Manager. 83func NewManager(channelSender ChannelLocalPublisher, presenceGetter NumLocalSubscribersGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager { 84 sm := &Manager{ 85 streams: make(map[string]streamContext), 86 datasourceStreams: map[string]map[string]struct{}{}, 87 channelSender: channelSender, 88 presenceGetter: presenceGetter, 89 pluginContextGetter: pluginContextGetter, 90 registerCh: make(chan submitRequest), 91 closedCh: make(chan struct{}), 92 checkInterval: defaultCheckInterval, 93 maxChecks: defaultMaxChecks, 94 datasourceCheckInterval: defaultDatasourceCheckInterval, 95 } 96 for _, opt := range opts { 97 opt(sm) 98 } 99 return sm 100} 101 102func (s *Manager) HandleDatasourceDelete(orgID int64, dsUID string) error { 103 return s.handleDatasourceEvent(orgID, dsUID, false) 104} 105 106func (s *Manager) HandleDatasourceUpdate(orgID int64, dsUID string) error { 107 return s.handleDatasourceEvent(orgID, dsUID, true) 108} 109 110func (s *Manager) handleDatasourceEvent(orgID int64, dsUID string, resubmit bool) error { 111 dsKey := datasourceKey(orgID, dsUID) 112 s.mu.RLock() 113 dsStreams, ok := s.datasourceStreams[dsKey] 114 if !ok { 115 s.mu.RUnlock() 116 return nil 117 } 118 var resubmitRequests []streamRequest 119 var waitChannels []chan struct{} 120 for channel := range dsStreams { 121 streamCtx, ok := s.streams[channel] 122 if !ok { 123 continue 124 } 125 streamCtx.cancelFn() 126 waitChannels = append(waitChannels, streamCtx.CloseCh) 127 resubmitRequests = append(resubmitRequests, streamCtx.streamRequest) 128 } 129 s.mu.RUnlock() 130 131 // Wait for all streams to stop. 132 for _, ch := range waitChannels { 133 <-ch 134 } 135 136 if resubmit { 137 // Re-submit streams. 138 for _, sr := range resubmitRequests { 139 _, err := s.SubmitStream(s.baseCtx, sr.user, sr.Channel, sr.Path, sr.PluginContext, sr.StreamRunner, true) 140 if err != nil { 141 // Log error but do not prevent execution of caller routine. 142 logger.Error("Error re-submitting stream", "path", sr.Path, "error", err) 143 } 144 } 145 } 146 147 return nil 148} 149 150func datasourceKey(orgID int64, dsUID string) string { 151 return fmt.Sprintf("%d_%s", orgID, dsUID) 152} 153 154func (s *Manager) stopStream(sr streamRequest, cancelFn func()) { 155 s.mu.Lock() 156 defer s.mu.Unlock() 157 streamCtx, ok := s.streams[sr.Channel] 158 if !ok { 159 return 160 } 161 closeCh := streamCtx.CloseCh 162 delete(s.streams, sr.Channel) 163 if sr.PluginContext.DataSourceInstanceSettings != nil { 164 dsUID := sr.PluginContext.DataSourceInstanceSettings.UID 165 dsKey := datasourceKey(sr.PluginContext.OrgID, dsUID) 166 delete(s.datasourceStreams[dsKey], sr.Channel) 167 } 168 cancelFn() 169 close(closeCh) 170} 171 172func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) { 173 numNoSubscribersChecks := 0 174 presenceTicker := time.NewTicker(s.checkInterval) 175 defer presenceTicker.Stop() 176 datasourceTicker := time.NewTicker(s.datasourceCheckInterval) 177 defer datasourceTicker.Stop() 178 for { 179 select { 180 case <-ctx.Done(): 181 return 182 case <-datasourceTicker.C: 183 if sr.PluginContext.DataSourceInstanceSettings != nil { 184 dsUID := sr.PluginContext.DataSourceInstanceSettings.UID 185 pCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, sr.PluginContext.PluginID, dsUID, false) 186 if err != nil { 187 logger.Error("Error getting datasource context", "channel", sr.Channel, "path", sr.Path, "error", err) 188 continue 189 } 190 if !ok { 191 logger.Debug("Datasource not found, stop stream", "channel", sr.Channel, "path", sr.Path) 192 return 193 } 194 if pCtx.DataSourceInstanceSettings.Updated != sr.PluginContext.DataSourceInstanceSettings.Updated { 195 logger.Debug("Datasource changed, re-establish stream", "channel", sr.Channel, "path", sr.Path) 196 err := s.HandleDatasourceUpdate(pCtx.OrgID, dsUID) 197 if err != nil { 198 logger.Error("Error re-establishing stream", "channel", sr.Channel, "path", sr.Path, "error", err) 199 continue 200 } 201 return 202 } 203 } 204 case <-presenceTicker.C: 205 numSubscribers, err := s.presenceGetter.GetNumLocalSubscribers(sr.Channel) 206 if err != nil { 207 logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path, "error", err) 208 continue 209 } 210 if numSubscribers > 0 { 211 // reset counter since channel has active subscribers. 212 numNoSubscribersChecks = 0 213 continue 214 } 215 numNoSubscribersChecks++ 216 if numNoSubscribersChecks >= s.maxChecks { 217 logger.Debug("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path) 218 s.stopStream(sr, cancelFn) 219 return 220 } 221 } 222 } 223} 224 225const streamDurationThreshold = 100 * time.Millisecond 226const coolDownDelay = 100 * time.Millisecond 227const maxDelay = 5 * time.Second 228 229func getDelay(numErrors int) time.Duration { 230 if numErrors == 0 { 231 return 0 232 } 233 delay := coolDownDelay * time.Duration(math.Pow(2, float64(numErrors))) 234 if delay > maxDelay { 235 return maxDelay 236 } 237 return delay 238} 239 240// run stream until context canceled or stream finished without an error. 241func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamRequest) { 242 defer func() { s.stopStream(sr, cancelFn) }() 243 var numFastErrors int 244 var delay time.Duration 245 var isReconnect bool 246 startTime := time.Now() 247 for { 248 select { 249 case <-ctx.Done(): 250 return 251 default: 252 } 253 254 pluginCtx := sr.PluginContext 255 256 if isReconnect { 257 // Best effort to cool down re-establishment process. We don't have a 258 // nice way to understand whether we really need to wait here - so relying 259 // on duration time of running a stream. 260 if time.Since(startTime) < streamDurationThreshold { 261 if delay < maxDelay { 262 // Due to not calling getDelay after we have delay larger than maxDelay 263 // we avoid possible float overflow errors while calculating delay duration 264 // based on numFastErrors. 265 delay = getDelay(numFastErrors) 266 } 267 numFastErrors++ 268 } else { 269 // Assuming that stream successfully started. 270 delay = 0 271 numFastErrors = 0 272 } 273 select { 274 case <-ctx.Done(): 275 return 276 case <-time.After(delay): 277 } 278 startTime = time.Now() 279 280 // Resolve new plugin context as it could be modified since last call. 281 // We are using the same user here which initiated stream originally. 282 var datasourceUID string 283 if pluginCtx.DataSourceInstanceSettings != nil { 284 datasourceUID = pluginCtx.DataSourceInstanceSettings.UID 285 } 286 newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(sr.user, pluginCtx.PluginID, datasourceUID, false) 287 if err != nil { 288 logger.Error("Error getting plugin context", "path", sr.Path, "error", err) 289 isReconnect = true 290 continue 291 } 292 if !ok { 293 logger.Info("No plugin context found, stopping stream", "path", sr.Path) 294 return 295 } 296 pluginCtx = newPluginCtx 297 } 298 299 err := sr.StreamRunner.RunStream( 300 ctx, 301 &backend.RunStreamRequest{ 302 PluginContext: pluginCtx, 303 Path: sr.Path, 304 }, 305 backend.NewStreamSender(&packetSender{channelLocalPublisher: s.channelSender, channel: sr.Channel}), 306 ) 307 if err != nil { 308 if errors.Is(ctx.Err(), context.Canceled) { 309 logger.Debug("Stream cleanly finished", "path", sr.Path) 310 return 311 } 312 logger.Error("Error running stream, re-establishing", "path", sr.Path, "error", err, "wait", delay) 313 isReconnect = true 314 continue 315 } 316 logger.Debug("Stream finished without error, stopping it", "path", sr.Path) 317 return 318 } 319} 320 321var errClosed = errors.New("stream manager closed") 322 323type streamContext struct { 324 CloseCh chan struct{} 325 cancelFn func() 326 streamRequest streamRequest 327} 328 329func (s *Manager) registerStream(ctx context.Context, sr submitRequest) { 330 s.mu.Lock() 331 if streamCtx, ok := s.streams[sr.streamRequest.Channel]; ok { 332 s.mu.Unlock() 333 sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true, CloseNotify: streamCtx.CloseCh}} 334 return 335 } 336 ctx, cancel := context.WithCancel(ctx) 337 defer cancel() 338 closeCh := make(chan struct{}) 339 s.streams[sr.streamRequest.Channel] = streamContext{ 340 CloseCh: closeCh, 341 cancelFn: cancel, 342 streamRequest: sr.streamRequest, 343 } 344 if sr.streamRequest.PluginContext.DataSourceInstanceSettings != nil { 345 dsUID := sr.streamRequest.PluginContext.DataSourceInstanceSettings.UID 346 dsKey := datasourceKey(sr.streamRequest.PluginContext.OrgID, dsUID) 347 if _, ok := s.datasourceStreams[dsKey]; !ok { 348 s.datasourceStreams[dsKey] = map[string]struct{}{} 349 } 350 s.datasourceStreams[dsKey][sr.streamRequest.Channel] = struct{}{} 351 } 352 s.mu.Unlock() 353 sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false, CloseNotify: closeCh}} 354 go s.watchStream(ctx, cancel, sr.streamRequest) 355 s.runStream(ctx, cancel, sr.streamRequest) 356} 357 358// Run Manager till context canceled. 359func (s *Manager) Run(ctx context.Context) error { 360 s.baseCtx = ctx 361 for { 362 select { 363 case sr := <-s.registerCh: 364 go s.registerStream(ctx, sr) 365 case <-ctx.Done(): 366 close(s.closedCh) 367 return ctx.Err() 368 } 369 } 370} 371 372type streamRequest struct { 373 Channel string 374 Path string 375 user *models.SignedInUser 376 PluginContext backend.PluginContext 377 StreamRunner StreamRunner 378} 379 380type submitRequest struct { 381 responseCh chan submitResponse 382 streamRequest streamRequest 383} 384 385type submitResult struct { 386 // StreamExists tells whether stream have been already opened. 387 StreamExists bool 388 // CloseNotify will be closed as soon as stream cleanly exited. 389 CloseNotify chan struct{} 390} 391 392type submitResponse struct { 393 Error error 394 Result submitResult 395} 396 397var errDatasourceNotFound = errors.New("datasource not found") 398 399// SubmitStream submits stream handler in Manager to manage. 400// The stream will be opened and kept till channel has active subscribers. 401func (s *Manager) SubmitStream(ctx context.Context, user *models.SignedInUser, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner, isResubmit bool) (*submitResult, error) { 402 if isResubmit { 403 // Resolve new plugin context as it could be modified since last call. 404 var datasourceUID string 405 if pCtx.DataSourceInstanceSettings != nil { 406 datasourceUID = pCtx.DataSourceInstanceSettings.UID 407 } 408 newPluginCtx, ok, err := s.pluginContextGetter.GetPluginContext(user, pCtx.PluginID, datasourceUID, false) 409 if err != nil { 410 return nil, err 411 } 412 if !ok { 413 return nil, errDatasourceNotFound 414 } 415 pCtx = newPluginCtx 416 } 417 418 req := submitRequest{ 419 responseCh: make(chan submitResponse, 1), 420 streamRequest: streamRequest{ 421 user: user, 422 Channel: channel, 423 Path: path, 424 PluginContext: pCtx, 425 StreamRunner: streamRunner, 426 }, 427 } 428 429 // Send submit request. 430 select { 431 case s.registerCh <- req: 432 case <-s.closedCh: 433 close(s.registerCh) 434 return nil, errClosed 435 case <-ctx.Done(): 436 return nil, ctx.Err() 437 } 438 439 // Wait for submit response. 440 select { 441 case resp := <-req.responseCh: 442 if resp.Error != nil { 443 return nil, resp.Error 444 } 445 return &resp.Result, nil 446 case <-s.closedCh: 447 return nil, errClosed 448 case <-ctx.Done(): 449 return nil, ctx.Err() 450 } 451} 452