1package backend
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7
8	"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
9
10	"github.com/grafana/grafana-plugin-sdk-go/data"
11)
12
13// StreamHandler handles streams.
14// This is EXPERIMENTAL and is a subject to change till Grafana 8.
15type StreamHandler interface {
16	// SubscribeStream called when a user tries to subscribe to a plugin/datasource
17	// managed channel path – thus plugin can check subscribe permissions and communicate
18	// options with Grafana Core. As soon as first subscriber joins channel RunStream
19	// will be called.
20	SubscribeStream(context.Context, *SubscribeStreamRequest) (*SubscribeStreamResponse, error)
21	// PublishStream called when a user tries to publish to a plugin/datasource
22	// managed channel path. Here plugin can check publish permissions and
23	// modify publication data if required.
24	PublishStream(context.Context, *PublishStreamRequest) (*PublishStreamResponse, error)
25	// RunStream will be initiated by Grafana to consume a stream. RunStream will be
26	// called once for the first client successfully subscribed to a channel path.
27	// When Grafana detects that there are no longer any subscribers inside a channel,
28	// the call will be terminated until next active subscriber appears. Call termination
29	// can happen with a delay.
30	RunStream(context.Context, *RunStreamRequest, *StreamSender) error
31}
32
33// SubscribeStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8.
34type SubscribeStreamRequest struct {
35	PluginContext PluginContext
36	Path          string
37}
38
39// SubscribeStreamStatus is a status of subscription response.
40type SubscribeStreamStatus int32
41
42const (
43	// SubscribeStreamStatusOK means subscription is allowed.
44	SubscribeStreamStatusOK SubscribeStreamStatus = 0
45	// SubscribeStreamStatusNotFound means stream does not exist at all.
46	SubscribeStreamStatusNotFound SubscribeStreamStatus = 1
47	// SubscribeStreamStatusPermissionDenied means that user is not allowed to subscribe.
48	SubscribeStreamStatusPermissionDenied SubscribeStreamStatus = 2
49)
50
51// SubscribeStreamResponse is EXPERIMENTAL and is a subject to change till Grafana 8.
52type SubscribeStreamResponse struct {
53	Status      SubscribeStreamStatus
54	InitialData *InitialData
55}
56
57// InitialData to send to a client upon a successful subscription to a channel.
58type InitialData struct {
59	data []byte
60}
61
62// Data allows to get prepared bytes of initial data.
63func (d *InitialData) Data() []byte {
64	return d.data
65}
66
67// NewInitialFrame allows creating frame as subscription InitialData.
68func NewInitialFrame(frame *data.Frame, include data.FrameInclude) (*InitialData, error) {
69	frameJSON, err := data.FrameToJSON(frame, include)
70	if err != nil {
71		return nil, err
72	}
73	return &InitialData{
74		data: frameJSON,
75	}, nil
76}
77
78// NewInitialData allows sending JSON on subscription
79func NewInitialData(data json.RawMessage) (*InitialData, error) {
80	if !json.Valid(data) {
81		return nil, fmt.Errorf("invalid JSON data")
82	}
83	return &InitialData{
84		data: data,
85	}, nil
86}
87
88// PublishStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8.
89type PublishStreamRequest struct {
90	PluginContext PluginContext
91	Path          string
92	Data          json.RawMessage
93}
94
95// PublishStreamStatus is a status of publication response.
96type PublishStreamStatus int32
97
98const (
99	// PublishStreamStatusOK means publication is allowed.
100	PublishStreamStatusOK PublishStreamStatus = 0
101	// PublishStreamStatusNotFound means stream does not exist at all.
102	PublishStreamStatusNotFound PublishStreamStatus = 1
103	// PublishStreamStatusPermissionDenied means that user is not allowed to publish.
104	PublishStreamStatusPermissionDenied PublishStreamStatus = 2
105)
106
107// PublishStreamResponse is EXPERIMENTAL and is a subject to change till Grafana 8.
108type PublishStreamResponse struct {
109	Status PublishStreamStatus
110	Data   json.RawMessage
111}
112
113// RunStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8.
114type RunStreamRequest struct {
115	PluginContext PluginContext
116	Path          string
117}
118
119// StreamPacket is EXPERIMENTAL and is a subject to change till Grafana 8.
120type StreamPacket struct {
121	Data json.RawMessage
122}
123
124// StreamPacketSender is EXPERIMENTAL and is a subject to change till Grafana 8.
125type StreamPacketSender interface {
126	Send(*StreamPacket) error
127}
128
129// StreamSender allows sending data to a stream.
130// StreamSender is EXPERIMENTAL and is a subject to change till Grafana 8.
131type StreamSender struct {
132	packetSender StreamPacketSender
133}
134
135func NewStreamSender(packetSender StreamPacketSender) *StreamSender {
136	return &StreamSender{packetSender: packetSender}
137}
138
139// SendFrame allows sending data.Frame to a stream.
140func (s *StreamSender) SendFrame(frame *data.Frame, include data.FrameInclude) error {
141	frameJSON, err := data.FrameToJSON(frame, include)
142	if err != nil {
143		return err
144	}
145	packet := &pluginv2.StreamPacket{
146		Data: frameJSON,
147	}
148	return s.packetSender.Send(FromProto().StreamPacket(packet))
149}
150
151// SendJSON allow sending arbitrary JSON to a stream. When sending data.Frame
152// prefer using SendFrame method.
153func (s *StreamSender) SendJSON(data []byte) error {
154	if !json.Valid(data) {
155		return fmt.Errorf("invalid JSON data")
156	}
157	packet := &pluginv2.StreamPacket{
158		Data: data,
159	}
160	return s.packetSender.Send(FromProto().StreamPacket(packet))
161}
162
163// SendBytes allow sending arbitrary Bytes to a stream. When sending data.Frame
164// prefer using SendFrame method. When sending an arbitrary raw JSON prefer
165// using SendJSON method.
166func (s *StreamSender) SendBytes(data []byte) error {
167	packet := &pluginv2.StreamPacket{
168		Data: data,
169	}
170	return s.packetSender.Send(FromProto().StreamPacket(packet))
171}
172