1package managedstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "sort" 8 "sync" 9 "time" 10 11 "github.com/grafana/grafana/pkg/services/live/orgchannel" 12 13 "github.com/grafana/grafana-plugin-sdk-go/backend" 14 "github.com/grafana/grafana-plugin-sdk-go/data" 15 "github.com/grafana/grafana-plugin-sdk-go/live" 16 "github.com/grafana/grafana/pkg/infra/log" 17 "github.com/grafana/grafana/pkg/models" 18) 19 20var ( 21 logger = log.New("live.managed_stream") 22) 23 24// If message comes from a plugin: 25// * it's simply sent to local subscribers without any additional steps 26// * if there is RULE then may be processed in some way 27// * important to keep a message in the original channel 28// * client subscribed to ds/<UID>/xxx 29// 30// What we want to build: 31// * Stream scope not hardcoded and determined by the caller 32// * So it's possible to use managed stream from plugins 33// * The problem is HA – at moment several plugins on different nodes publish same messages 34// * Can use in-memory managed stream for plugins with local subscribers publish, use HA-managed stream for HTTP/WS 35// * Eventually maintain a single connection with a plugin over a channel leader selection. 36 37// Runner keeps NamespaceStream per namespace. 38type Runner struct { 39 mu sync.RWMutex 40 streams map[int64]map[string]*NamespaceStream 41 publisher models.ChannelPublisher 42 localPublisher LocalPublisher 43 frameCache FrameCache 44} 45 46type LocalPublisher interface { 47 PublishLocal(channel string, data []byte) error 48} 49 50// NewRunner creates new Runner. 51func NewRunner(publisher models.ChannelPublisher, localPublisher LocalPublisher, frameCache FrameCache) *Runner { 52 return &Runner{ 53 publisher: publisher, 54 localPublisher: localPublisher, 55 streams: map[int64]map[string]*NamespaceStream{}, 56 frameCache: frameCache, 57 } 58} 59 60func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error) { 61 activeChannels, err := r.frameCache.GetActiveChannels(orgID) 62 if err != nil { 63 return []*ManagedChannel{}, fmt.Errorf("error getting active managed stream paths: %v", err) 64 } 65 channels := make([]*ManagedChannel, 0, len(activeChannels)) 66 for ch, schema := range activeChannels { 67 managedChannel := &ManagedChannel{ 68 Channel: ch, 69 Data: schema, 70 } 71 // Enrich with minute rate. 72 channel, _ := live.ParseChannel(managedChannel.Channel) 73 prefix := channel.Scope + "/" + channel.Namespace 74 namespaceStream, ok := r.streams[orgID][prefix] 75 if ok { 76 managedChannel.MinuteRate = namespaceStream.minuteRate(channel.Path) 77 } 78 channels = append(channels, managedChannel) 79 } 80 81 // Hardcode sample streams 82 frameJSON, err := data.FrameToJSON(data.NewFrame("testdata", 83 data.NewField("Time", nil, make([]time.Time, 0)), 84 data.NewField("Value", nil, make([]float64, 0)), 85 data.NewField("Min", nil, make([]float64, 0)), 86 data.NewField("Max", nil, make([]float64, 0)), 87 ), data.IncludeSchemaOnly) 88 if err == nil { 89 channels = append(channels, &ManagedChannel{ 90 Channel: "plugin/testdata/random-2s-stream", 91 Data: frameJSON, 92 MinuteRate: 30, 93 }, &ManagedChannel{ 94 Channel: "plugin/testdata/random-flakey-stream", 95 Data: frameJSON, 96 MinuteRate: 150, 97 }, &ManagedChannel{ 98 Channel: "plugin/testdata/random-20Hz-stream", 99 Data: frameJSON, 100 MinuteRate: 1200, 101 }) 102 } 103 104 sort.Slice(channels, func(i, j int) bool { 105 return channels[i].Channel < channels[j].Channel 106 }) 107 108 return channels, nil 109} 110 111// GetOrCreateStream -- for now this will create new manager for each key. 112// Eventually, the stream behavior will need to be configured explicitly 113func (r *Runner) GetOrCreateStream(orgID int64, scope string, namespace string) (*NamespaceStream, error) { 114 r.mu.Lock() 115 defer r.mu.Unlock() 116 _, ok := r.streams[orgID] 117 if !ok { 118 r.streams[orgID] = map[string]*NamespaceStream{} 119 } 120 prefix := scope + "/" + namespace 121 s, ok := r.streams[orgID][prefix] 122 if !ok { 123 s = NewNamespaceStream(orgID, scope, namespace, r.publisher, r.localPublisher, r.frameCache) 124 r.streams[orgID][prefix] = s 125 } 126 return s, nil 127} 128 129// NamespaceStream holds the state of a managed stream. 130type NamespaceStream struct { 131 orgID int64 132 scope string 133 namespace string 134 publisher models.ChannelPublisher 135 localPublisher LocalPublisher 136 frameCache FrameCache 137 rateMu sync.RWMutex 138 rates map[string][60]rateEntry 139} 140 141type rateEntry struct { 142 time uint32 143 count int32 144} 145 146// ManagedChannel represents a managed stream. 147type ManagedChannel struct { 148 Channel string `json:"channel"` 149 MinuteRate int64 `json:"minute_rate"` 150 Data json.RawMessage `json:"data"` 151} 152 153// NewNamespaceStream creates new NamespaceStream. 154func NewNamespaceStream(orgID int64, scope string, namespace string, publisher models.ChannelPublisher, localPublisher LocalPublisher, schemaUpdater FrameCache) *NamespaceStream { 155 return &NamespaceStream{ 156 orgID: orgID, 157 scope: scope, 158 namespace: namespace, 159 publisher: publisher, 160 localPublisher: localPublisher, 161 frameCache: schemaUpdater, 162 rates: map[string][60]rateEntry{}, 163 } 164} 165 166// Push sends frame to the stream and saves it for later retrieval by subscribers. 167// * Saves the entire frame to cache. 168// * If schema has been changed sends entire frame to channel, otherwise only data. 169func (s *NamespaceStream) Push(path string, frame *data.Frame) error { 170 jsonFrameCache, err := data.FrameToJSONCache(frame) 171 if err != nil { 172 return err 173 } 174 175 // The channel this will be posted into. 176 channel := live.Channel{Scope: s.scope, Namespace: s.namespace, Path: path}.String() 177 178 isUpdated, err := s.frameCache.Update(s.orgID, channel, jsonFrameCache) 179 if err != nil { 180 logger.Error("Error updating managed stream schema", "error", err) 181 return err 182 } 183 184 // When the schema has not changed, just send the data. 185 include := data.IncludeDataOnly 186 if isUpdated { 187 // When the schema has been changed, send all. 188 include = data.IncludeAll 189 } 190 frameJSON := jsonFrameCache.Bytes(include) 191 192 logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON)) 193 s.incRate(path, time.Now().Unix()) 194 if s.scope == live.ScopeDatasource || s.scope == live.ScopePlugin { 195 return s.localPublisher.PublishLocal(orgchannel.PrependOrgID(s.orgID, channel), frameJSON) 196 } 197 return s.publisher(s.orgID, channel, frameJSON) 198} 199 200func (s *NamespaceStream) incRate(path string, nowUnix int64) { 201 s.rateMu.Lock() 202 pathRate, ok := s.rates[path] 203 if !ok { 204 pathRate = [60]rateEntry{} 205 } 206 now := time.Unix(nowUnix, 0) 207 slot := now.Second() % 60 208 if pathRate[slot].time != uint32(nowUnix) { 209 pathRate[slot].count = 0 210 } 211 pathRate[slot].time = uint32(nowUnix) 212 pathRate[slot].count += 1 213 s.rates[path] = pathRate 214 s.rateMu.Unlock() 215} 216 217func (s *NamespaceStream) minuteRate(path string) int64 { 218 var total int64 219 s.rateMu.RLock() 220 defer s.rateMu.RUnlock() 221 pathRate, ok := s.rates[path] 222 if !ok { 223 return 0 224 } 225 for _, val := range pathRate { 226 if val.time > uint32(time.Now().Unix()-60) { 227 total += int64(val.count) 228 } 229 } 230 return total 231} 232 233func (s *NamespaceStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) { 234 return s, nil 235} 236 237func (s *NamespaceStream) OnSubscribe(_ context.Context, u *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { 238 reply := models.SubscribeReply{} 239 frameJSON, ok, err := s.frameCache.GetFrame(u.OrgId, e.Channel) 240 if err != nil { 241 return reply, 0, err 242 } 243 if ok { 244 reply.Data = frameJSON 245 } 246 return reply, backend.SubscribeStreamStatusOK, nil 247} 248 249func (s *NamespaceStream) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { 250 return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil 251} 252