1package backend 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 8 "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" 9 10 "github.com/grafana/grafana-plugin-sdk-go/data" 11) 12 13// StreamHandler handles streams. 14// This is EXPERIMENTAL and is a subject to change till Grafana 8. 15type StreamHandler interface { 16 // SubscribeStream called when a user tries to subscribe to a plugin/datasource 17 // managed channel path – thus plugin can check subscribe permissions and communicate 18 // options with Grafana Core. As soon as first subscriber joins channel RunStream 19 // will be called. 20 SubscribeStream(context.Context, *SubscribeStreamRequest) (*SubscribeStreamResponse, error) 21 // PublishStream called when a user tries to publish to a plugin/datasource 22 // managed channel path. Here plugin can check publish permissions and 23 // modify publication data if required. 24 PublishStream(context.Context, *PublishStreamRequest) (*PublishStreamResponse, error) 25 // RunStream will be initiated by Grafana to consume a stream. RunStream will be 26 // called once for the first client successfully subscribed to a channel path. 27 // When Grafana detects that there are no longer any subscribers inside a channel, 28 // the call will be terminated until next active subscriber appears. Call termination 29 // can happen with a delay. 30 RunStream(context.Context, *RunStreamRequest, *StreamSender) error 31} 32 33// SubscribeStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8. 34type SubscribeStreamRequest struct { 35 PluginContext PluginContext 36 Path string 37} 38 39// SubscribeStreamStatus is a status of subscription response. 40type SubscribeStreamStatus int32 41 42const ( 43 // SubscribeStreamStatusOK means subscription is allowed. 44 SubscribeStreamStatusOK SubscribeStreamStatus = 0 45 // SubscribeStreamStatusNotFound means stream does not exist at all. 46 SubscribeStreamStatusNotFound SubscribeStreamStatus = 1 47 // SubscribeStreamStatusPermissionDenied means that user is not allowed to subscribe. 48 SubscribeStreamStatusPermissionDenied SubscribeStreamStatus = 2 49) 50 51// SubscribeStreamResponse is EXPERIMENTAL and is a subject to change till Grafana 8. 52type SubscribeStreamResponse struct { 53 Status SubscribeStreamStatus 54 InitialData *InitialData 55} 56 57// InitialData to send to a client upon a successful subscription to a channel. 58type InitialData struct { 59 data []byte 60} 61 62// Data allows to get prepared bytes of initial data. 63func (d *InitialData) Data() []byte { 64 return d.data 65} 66 67// NewInitialFrame allows creating frame as subscription InitialData. 68func NewInitialFrame(frame *data.Frame, include data.FrameInclude) (*InitialData, error) { 69 frameJSON, err := data.FrameToJSON(frame, include) 70 if err != nil { 71 return nil, err 72 } 73 return &InitialData{ 74 data: frameJSON, 75 }, nil 76} 77 78// NewInitialData allows sending JSON on subscription 79func NewInitialData(data json.RawMessage) (*InitialData, error) { 80 if !json.Valid(data) { 81 return nil, fmt.Errorf("invalid JSON data") 82 } 83 return &InitialData{ 84 data: data, 85 }, nil 86} 87 88// PublishStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8. 89type PublishStreamRequest struct { 90 PluginContext PluginContext 91 Path string 92 Data json.RawMessage 93} 94 95// PublishStreamStatus is a status of publication response. 96type PublishStreamStatus int32 97 98const ( 99 // PublishStreamStatusOK means publication is allowed. 100 PublishStreamStatusOK PublishStreamStatus = 0 101 // PublishStreamStatusNotFound means stream does not exist at all. 102 PublishStreamStatusNotFound PublishStreamStatus = 1 103 // PublishStreamStatusPermissionDenied means that user is not allowed to publish. 104 PublishStreamStatusPermissionDenied PublishStreamStatus = 2 105) 106 107// PublishStreamResponse is EXPERIMENTAL and is a subject to change till Grafana 8. 108type PublishStreamResponse struct { 109 Status PublishStreamStatus 110 Data json.RawMessage 111} 112 113// RunStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8. 114type RunStreamRequest struct { 115 PluginContext PluginContext 116 Path string 117} 118 119// StreamPacket is EXPERIMENTAL and is a subject to change till Grafana 8. 120type StreamPacket struct { 121 Data json.RawMessage 122} 123 124// StreamPacketSender is EXPERIMENTAL and is a subject to change till Grafana 8. 125type StreamPacketSender interface { 126 Send(*StreamPacket) error 127} 128 129// StreamSender allows sending data to a stream. 130// StreamSender is EXPERIMENTAL and is a subject to change till Grafana 8. 131type StreamSender struct { 132 packetSender StreamPacketSender 133} 134 135func NewStreamSender(packetSender StreamPacketSender) *StreamSender { 136 return &StreamSender{packetSender: packetSender} 137} 138 139// SendFrame allows sending data.Frame to a stream. 140func (s *StreamSender) SendFrame(frame *data.Frame, include data.FrameInclude) error { 141 frameJSON, err := data.FrameToJSON(frame, include) 142 if err != nil { 143 return err 144 } 145 packet := &pluginv2.StreamPacket{ 146 Data: frameJSON, 147 } 148 return s.packetSender.Send(FromProto().StreamPacket(packet)) 149} 150 151// SendJSON allow sending arbitrary JSON to a stream. When sending data.Frame 152// prefer using SendFrame method. 153func (s *StreamSender) SendJSON(data []byte) error { 154 if !json.Valid(data) { 155 return fmt.Errorf("invalid JSON data") 156 } 157 packet := &pluginv2.StreamPacket{ 158 Data: data, 159 } 160 return s.packetSender.Send(FromProto().StreamPacket(packet)) 161} 162 163// SendBytes allow sending arbitrary Bytes to a stream. When sending data.Frame 164// prefer using SendFrame method. When sending an arbitrary raw JSON prefer 165// using SendJSON method. 166func (s *StreamSender) SendBytes(data []byte) error { 167 packet := &pluginv2.StreamPacket{ 168 Data: data, 169 } 170 return s.packetSender.Send(FromProto().StreamPacket(packet)) 171} 172