1package managedstream
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7	"sort"
8	"sync"
9	"time"
10
11	"github.com/grafana/grafana/pkg/services/live/orgchannel"
12
13	"github.com/grafana/grafana-plugin-sdk-go/backend"
14	"github.com/grafana/grafana-plugin-sdk-go/data"
15	"github.com/grafana/grafana-plugin-sdk-go/live"
16	"github.com/grafana/grafana/pkg/infra/log"
17	"github.com/grafana/grafana/pkg/models"
18)
19
20var (
21	logger = log.New("live.managed_stream")
22)
23
24// If message comes from a plugin:
25// 	* it's simply sent to local subscribers without any additional steps
26//  * if there is RULE then may be processed in some way
27//  * important to keep a message in the original channel
28// 	* client subscribed to ds/<UID>/xxx
29//
30// What we want to build:
31// 	* Stream scope not hardcoded and determined by the caller
32// 	* So it's possible to use managed stream from plugins
33// 	* The problem is HA – at moment several plugins on different nodes publish same messages
34// 	* Can use in-memory managed stream for plugins with local subscribers publish, use HA-managed stream for HTTP/WS
35// 	* Eventually maintain a single connection with a plugin over a channel leader selection.
36
37// Runner keeps NamespaceStream per namespace.
38type Runner struct {
39	mu             sync.RWMutex
40	streams        map[int64]map[string]*NamespaceStream
41	publisher      models.ChannelPublisher
42	localPublisher LocalPublisher
43	frameCache     FrameCache
44}
45
46type LocalPublisher interface {
47	PublishLocal(channel string, data []byte) error
48}
49
50// NewRunner creates new Runner.
51func NewRunner(publisher models.ChannelPublisher, localPublisher LocalPublisher, frameCache FrameCache) *Runner {
52	return &Runner{
53		publisher:      publisher,
54		localPublisher: localPublisher,
55		streams:        map[int64]map[string]*NamespaceStream{},
56		frameCache:     frameCache,
57	}
58}
59
60func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error) {
61	activeChannels, err := r.frameCache.GetActiveChannels(orgID)
62	if err != nil {
63		return []*ManagedChannel{}, fmt.Errorf("error getting active managed stream paths: %v", err)
64	}
65	channels := make([]*ManagedChannel, 0, len(activeChannels))
66	for ch, schema := range activeChannels {
67		managedChannel := &ManagedChannel{
68			Channel: ch,
69			Data:    schema,
70		}
71		// Enrich with minute rate.
72		channel, _ := live.ParseChannel(managedChannel.Channel)
73		prefix := channel.Scope + "/" + channel.Namespace
74		namespaceStream, ok := r.streams[orgID][prefix]
75		if ok {
76			managedChannel.MinuteRate = namespaceStream.minuteRate(channel.Path)
77		}
78		channels = append(channels, managedChannel)
79	}
80
81	// Hardcode sample streams
82	frameJSON, err := data.FrameToJSON(data.NewFrame("testdata",
83		data.NewField("Time", nil, make([]time.Time, 0)),
84		data.NewField("Value", nil, make([]float64, 0)),
85		data.NewField("Min", nil, make([]float64, 0)),
86		data.NewField("Max", nil, make([]float64, 0)),
87	), data.IncludeSchemaOnly)
88	if err == nil {
89		channels = append(channels, &ManagedChannel{
90			Channel:    "plugin/testdata/random-2s-stream",
91			Data:       frameJSON,
92			MinuteRate: 30,
93		}, &ManagedChannel{
94			Channel:    "plugin/testdata/random-flakey-stream",
95			Data:       frameJSON,
96			MinuteRate: 150,
97		}, &ManagedChannel{
98			Channel:    "plugin/testdata/random-20Hz-stream",
99			Data:       frameJSON,
100			MinuteRate: 1200,
101		})
102	}
103
104	sort.Slice(channels, func(i, j int) bool {
105		return channels[i].Channel < channels[j].Channel
106	})
107
108	return channels, nil
109}
110
111// GetOrCreateStream -- for now this will create new manager for each key.
112// Eventually, the stream behavior will need to be configured explicitly
113func (r *Runner) GetOrCreateStream(orgID int64, scope string, namespace string) (*NamespaceStream, error) {
114	r.mu.Lock()
115	defer r.mu.Unlock()
116	_, ok := r.streams[orgID]
117	if !ok {
118		r.streams[orgID] = map[string]*NamespaceStream{}
119	}
120	prefix := scope + "/" + namespace
121	s, ok := r.streams[orgID][prefix]
122	if !ok {
123		s = NewNamespaceStream(orgID, scope, namespace, r.publisher, r.localPublisher, r.frameCache)
124		r.streams[orgID][prefix] = s
125	}
126	return s, nil
127}
128
129// NamespaceStream holds the state of a managed stream.
130type NamespaceStream struct {
131	orgID          int64
132	scope          string
133	namespace      string
134	publisher      models.ChannelPublisher
135	localPublisher LocalPublisher
136	frameCache     FrameCache
137	rateMu         sync.RWMutex
138	rates          map[string][60]rateEntry
139}
140
141type rateEntry struct {
142	time  uint32
143	count int32
144}
145
146// ManagedChannel represents a managed stream.
147type ManagedChannel struct {
148	Channel    string          `json:"channel"`
149	MinuteRate int64           `json:"minute_rate"`
150	Data       json.RawMessage `json:"data"`
151}
152
153// NewNamespaceStream creates new NamespaceStream.
154func NewNamespaceStream(orgID int64, scope string, namespace string, publisher models.ChannelPublisher, localPublisher LocalPublisher, schemaUpdater FrameCache) *NamespaceStream {
155	return &NamespaceStream{
156		orgID:          orgID,
157		scope:          scope,
158		namespace:      namespace,
159		publisher:      publisher,
160		localPublisher: localPublisher,
161		frameCache:     schemaUpdater,
162		rates:          map[string][60]rateEntry{},
163	}
164}
165
166// Push sends frame to the stream and saves it for later retrieval by subscribers.
167// * Saves the entire frame to cache.
168// * If schema has been changed sends entire frame to channel, otherwise only data.
169func (s *NamespaceStream) Push(path string, frame *data.Frame) error {
170	jsonFrameCache, err := data.FrameToJSONCache(frame)
171	if err != nil {
172		return err
173	}
174
175	// The channel this will be posted into.
176	channel := live.Channel{Scope: s.scope, Namespace: s.namespace, Path: path}.String()
177
178	isUpdated, err := s.frameCache.Update(s.orgID, channel, jsonFrameCache)
179	if err != nil {
180		logger.Error("Error updating managed stream schema", "error", err)
181		return err
182	}
183
184	// When the schema has not changed, just send the data.
185	include := data.IncludeDataOnly
186	if isUpdated {
187		// When the schema has been changed, send all.
188		include = data.IncludeAll
189	}
190	frameJSON := jsonFrameCache.Bytes(include)
191
192	logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
193	s.incRate(path, time.Now().Unix())
194	if s.scope == live.ScopeDatasource || s.scope == live.ScopePlugin {
195		return s.localPublisher.PublishLocal(orgchannel.PrependOrgID(s.orgID, channel), frameJSON)
196	}
197	return s.publisher(s.orgID, channel, frameJSON)
198}
199
200func (s *NamespaceStream) incRate(path string, nowUnix int64) {
201	s.rateMu.Lock()
202	pathRate, ok := s.rates[path]
203	if !ok {
204		pathRate = [60]rateEntry{}
205	}
206	now := time.Unix(nowUnix, 0)
207	slot := now.Second() % 60
208	if pathRate[slot].time != uint32(nowUnix) {
209		pathRate[slot].count = 0
210	}
211	pathRate[slot].time = uint32(nowUnix)
212	pathRate[slot].count += 1
213	s.rates[path] = pathRate
214	s.rateMu.Unlock()
215}
216
217func (s *NamespaceStream) minuteRate(path string) int64 {
218	var total int64
219	s.rateMu.RLock()
220	defer s.rateMu.RUnlock()
221	pathRate, ok := s.rates[path]
222	if !ok {
223		return 0
224	}
225	for _, val := range pathRate {
226		if val.time > uint32(time.Now().Unix()-60) {
227			total += int64(val.count)
228		}
229	}
230	return total
231}
232
233func (s *NamespaceStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
234	return s, nil
235}
236
237func (s *NamespaceStream) OnSubscribe(_ context.Context, u *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
238	reply := models.SubscribeReply{}
239	frameJSON, ok, err := s.frameCache.GetFrame(u.OrgId, e.Channel)
240	if err != nil {
241		return reply, 0, err
242	}
243	if ok {
244		reply.Data = frameJSON
245	}
246	return reply, backend.SubscribeStreamStatusOK, nil
247}
248
249func (s *NamespaceStream) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
250	return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
251}
252