1import Centrifuge from 'centrifuge/dist/centrifuge';
2import { LiveDataStreamOptions } from '@grafana/runtime';
3import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError';
4import { BehaviorSubject, Observable } from 'rxjs';
5import {
6  DataFrame,
7  DataFrameJSON,
8  dataFrameToJSON,
9  DataQueryResponse,
10  isLiveChannelMessageEvent,
11  isLiveChannelStatusEvent,
12  LiveChannelAddress,
13  LiveChannelConnectionState,
14  LiveChannelEvent,
15  LiveChannelPresenceStatus,
16  LoadingState,
17  StreamingDataFrame,
18  toDataFrameDTO,
19} from '@grafana/data';
20import { CentrifugeLiveChannel } from './channel';
21
22export type CentrifugeSrvDeps = {
23  appUrl: string;
24  orgId: number;
25  orgRole: string;
26  sessionId: string;
27  liveEnabled: boolean;
28  dataStreamSubscriberReadiness: Observable<boolean>;
29};
30
31export interface CentrifugeSrv {
32  /**
33   * Listen for changes to the connection state
34   */
35  getConnectionState(): Observable<boolean>;
36
37  /**
38   * Watch for messages in a channel
39   */
40  getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>>;
41
42  /**
43   * Connect to a channel and return results as DataFrames
44   */
45  getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse>;
46
47  /**
48   * For channels that support presence, this will request the current state from the server.
49   *
50   * Join and leave messages will be sent to the open stream
51   */
52  getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus>;
53}
54
55export class CentrifugeService implements CentrifugeSrv {
56  readonly open = new Map<string, CentrifugeLiveChannel>();
57  readonly centrifuge: Centrifuge;
58  readonly connectionState: BehaviorSubject<boolean>;
59  readonly connectionBlocker: Promise<void>;
60  private dataStreamSubscriberReady = true;
61
62  constructor(private deps: CentrifugeSrvDeps) {
63    deps.dataStreamSubscriberReadiness.subscribe((next) => (this.dataStreamSubscriberReady = next));
64    const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`;
65    this.centrifuge = new Centrifuge(liveUrl, {});
66    this.centrifuge.setConnectData({
67      sessionId: deps.sessionId,
68      orgId: deps.orgId,
69    });
70    // orgRole is set when logged in *or* anonomus users can use grafana
71    if (deps.liveEnabled && deps.orgRole !== '') {
72      this.centrifuge.connect(); // do connection
73    }
74    this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
75    this.connectionBlocker = new Promise<void>((resolve) => {
76      if (this.centrifuge.isConnected()) {
77        return resolve();
78      }
79      const connectListener = () => {
80        resolve();
81        this.centrifuge.removeListener('connect', connectListener);
82      };
83      this.centrifuge.addListener('connect', connectListener);
84    });
85
86    // Register global listeners
87    this.centrifuge.on('connect', this.onConnect);
88    this.centrifuge.on('disconnect', this.onDisconnect);
89    this.centrifuge.on('publish', this.onServerSideMessage);
90  }
91
92  //----------------------------------------------------------
93  // Internal functions
94  //----------------------------------------------------------
95
96  private onConnect = (context: any) => {
97    this.connectionState.next(true);
98  };
99
100  private onDisconnect = (context: any) => {
101    this.connectionState.next(false);
102  };
103
104  private onServerSideMessage = (context: any) => {
105    console.log('Publication from server-side channel', context);
106  };
107
108  /**
109   * Get a channel.  If the scope, namespace, or path is invalid, a shutdown
110   * channel will be returned with an error state indicated in its status
111   */
112  private getChannel<TMessage>(addr: LiveChannelAddress): CentrifugeLiveChannel<TMessage> {
113    const id = `${this.deps.orgId}/${addr.scope}/${addr.namespace}/${addr.path}`;
114    let channel = this.open.get(id);
115    if (channel != null) {
116      return channel;
117    }
118
119    channel = new CentrifugeLiveChannel(id, addr);
120    if (channel.currentStatus.state === LiveChannelConnectionState.Invalid) {
121      return channel;
122    }
123    channel.shutdownCallback = () => {
124      this.open.delete(id); // remove it from the list of open channels
125    };
126    this.open.set(id, channel);
127
128    // Initialize the channel in the background
129    this.initChannel(channel).catch((err) => {
130      if (channel) {
131        channel.currentStatus.state = LiveChannelConnectionState.Invalid;
132        channel.shutdownWithError(err);
133      }
134      this.open.delete(id);
135    });
136
137    // return the not-yet initalized channel
138    return channel;
139  }
140
141  private async initChannel(channel: CentrifugeLiveChannel): Promise<void> {
142    const events = channel.initalize();
143    if (!this.centrifuge.isConnected()) {
144      await this.connectionBlocker;
145    }
146    channel.subscription = this.centrifuge.subscribe(channel.id, events);
147    return;
148  }
149
150  //----------------------------------------------------------
151  // Exported functions
152  //----------------------------------------------------------
153
154  /**
155   * Listen for changes to the connection state
156   */
157  getConnectionState() {
158    return this.connectionState.asObservable();
159  }
160
161  /**
162   * Watch for messages in a channel
163   */
164  getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> {
165    return this.getChannel<T>(address).getStream();
166  }
167
168  /**
169   * Connect to a channel and return results as DataFrames
170   */
171  getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
172    return new Observable<DataQueryResponse>((subscriber) => {
173      const channel = this.getChannel(options.addr);
174      const key = options.key ?? `xstr/${streamCounter++}`;
175      let data: StreamingDataFrame | undefined = undefined;
176      let filtered: DataFrame | undefined = undefined;
177      let state = LoadingState.Streaming;
178      let lastWidth = -1;
179
180      const process = (msg: DataFrameJSON) => {
181        if (!data) {
182          data = new StreamingDataFrame(msg, options.buffer);
183        } else {
184          data.push(msg);
185        }
186        state = LoadingState.Streaming;
187        const sameWidth = lastWidth === data.fields.length;
188        lastWidth = data.fields.length;
189
190        // Filter out fields
191        if (!filtered || msg.schema || !sameWidth) {
192          filtered = data;
193          if (options.filter) {
194            const { fields } = options.filter;
195            if (fields?.length) {
196              filtered = {
197                ...data,
198                fields: data.fields.filter((f) => fields.includes(f.name)),
199              };
200            }
201          }
202        }
203
204        if (this.dataStreamSubscriberReady) {
205          filtered.length = data.length; // make sure they stay up-to-date
206          subscriber.next({
207            state,
208            data: [
209              // workaround for serializing issues when sending DataFrame from web worker to the main thread
210              // DataFrame is making use of ArrayVectors which are es6 classes and thus not cloneable out of the box
211              // `toDataFrameDTO` converts ArrayVectors into native arrays.
212              toDataFrameDTO(filtered),
213            ],
214            key,
215          });
216        }
217      };
218
219      if (options.frame) {
220        process(dataFrameToJSON(options.frame));
221      } else if (channel.lastMessageWithSchema) {
222        process(channel.lastMessageWithSchema);
223      }
224
225      const sub = channel.getStream().subscribe({
226        error: (err: any) => {
227          console.log('LiveQuery [error]', { err }, options.addr);
228          state = LoadingState.Error;
229          subscriber.next({ state, data: [data], key, error: toDataQueryError(err) });
230          sub.unsubscribe(); // close after error
231        },
232        complete: () => {
233          console.log('LiveQuery [complete]', options.addr);
234          if (state !== LoadingState.Error) {
235            state = LoadingState.Done;
236          }
237          // or track errors? subscriber.next({ state, data: [data], key });
238          subscriber.complete();
239          sub.unsubscribe();
240        },
241        next: (evt: LiveChannelEvent) => {
242          if (isLiveChannelMessageEvent(evt)) {
243            process(evt.message);
244            return;
245          }
246          if (isLiveChannelStatusEvent(evt)) {
247            if (evt.error) {
248              let error = toDataQueryError(evt.error);
249              error.message = `Streaming channel error: ${error.message}`;
250              state = LoadingState.Error;
251              subscriber.next({ state, data: [data], key, error });
252              return;
253            } else if (
254              evt.state === LiveChannelConnectionState.Connected ||
255              evt.state === LiveChannelConnectionState.Pending
256            ) {
257              if (evt.message) {
258                process(evt.message);
259              }
260              return;
261            }
262            console.log('ignore state', evt);
263          }
264        },
265      });
266
267      return () => {
268        sub.unsubscribe();
269      };
270    });
271  }
272
273  /**
274   * For channels that support presence, this will request the current state from the server.
275   *
276   * Join and leave messages will be sent to the open stream
277   */
278  getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> {
279    return this.getChannel(address).getPresence();
280  }
281}
282
283// This is used to give a unique key for each stream.  The actual value does not matter
284let streamCounter = 0;
285