1import Centrifuge from 'centrifuge/dist/centrifuge'; 2import { LiveDataStreamOptions } from '@grafana/runtime'; 3import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError'; 4import { BehaviorSubject, Observable } from 'rxjs'; 5import { 6 DataFrame, 7 DataFrameJSON, 8 dataFrameToJSON, 9 DataQueryResponse, 10 isLiveChannelMessageEvent, 11 isLiveChannelStatusEvent, 12 LiveChannelAddress, 13 LiveChannelConnectionState, 14 LiveChannelEvent, 15 LiveChannelPresenceStatus, 16 LoadingState, 17 StreamingDataFrame, 18 toDataFrameDTO, 19} from '@grafana/data'; 20import { CentrifugeLiveChannel } from './channel'; 21 22export type CentrifugeSrvDeps = { 23 appUrl: string; 24 orgId: number; 25 orgRole: string; 26 sessionId: string; 27 liveEnabled: boolean; 28 dataStreamSubscriberReadiness: Observable<boolean>; 29}; 30 31export interface CentrifugeSrv { 32 /** 33 * Listen for changes to the connection state 34 */ 35 getConnectionState(): Observable<boolean>; 36 37 /** 38 * Watch for messages in a channel 39 */ 40 getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>>; 41 42 /** 43 * Connect to a channel and return results as DataFrames 44 */ 45 getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse>; 46 47 /** 48 * For channels that support presence, this will request the current state from the server. 49 * 50 * Join and leave messages will be sent to the open stream 51 */ 52 getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus>; 53} 54 55export class CentrifugeService implements CentrifugeSrv { 56 readonly open = new Map<string, CentrifugeLiveChannel>(); 57 readonly centrifuge: Centrifuge; 58 readonly connectionState: BehaviorSubject<boolean>; 59 readonly connectionBlocker: Promise<void>; 60 private dataStreamSubscriberReady = true; 61 62 constructor(private deps: CentrifugeSrvDeps) { 63 deps.dataStreamSubscriberReadiness.subscribe((next) => (this.dataStreamSubscriberReady = next)); 64 const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`; 65 this.centrifuge = new Centrifuge(liveUrl, {}); 66 this.centrifuge.setConnectData({ 67 sessionId: deps.sessionId, 68 orgId: deps.orgId, 69 }); 70 // orgRole is set when logged in *or* anonomus users can use grafana 71 if (deps.liveEnabled && deps.orgRole !== '') { 72 this.centrifuge.connect(); // do connection 73 } 74 this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected()); 75 this.connectionBlocker = new Promise<void>((resolve) => { 76 if (this.centrifuge.isConnected()) { 77 return resolve(); 78 } 79 const connectListener = () => { 80 resolve(); 81 this.centrifuge.removeListener('connect', connectListener); 82 }; 83 this.centrifuge.addListener('connect', connectListener); 84 }); 85 86 // Register global listeners 87 this.centrifuge.on('connect', this.onConnect); 88 this.centrifuge.on('disconnect', this.onDisconnect); 89 this.centrifuge.on('publish', this.onServerSideMessage); 90 } 91 92 //---------------------------------------------------------- 93 // Internal functions 94 //---------------------------------------------------------- 95 96 private onConnect = (context: any) => { 97 this.connectionState.next(true); 98 }; 99 100 private onDisconnect = (context: any) => { 101 this.connectionState.next(false); 102 }; 103 104 private onServerSideMessage = (context: any) => { 105 console.log('Publication from server-side channel', context); 106 }; 107 108 /** 109 * Get a channel. If the scope, namespace, or path is invalid, a shutdown 110 * channel will be returned with an error state indicated in its status 111 */ 112 private getChannel<TMessage>(addr: LiveChannelAddress): CentrifugeLiveChannel<TMessage> { 113 const id = `${this.deps.orgId}/${addr.scope}/${addr.namespace}/${addr.path}`; 114 let channel = this.open.get(id); 115 if (channel != null) { 116 return channel; 117 } 118 119 channel = new CentrifugeLiveChannel(id, addr); 120 if (channel.currentStatus.state === LiveChannelConnectionState.Invalid) { 121 return channel; 122 } 123 channel.shutdownCallback = () => { 124 this.open.delete(id); // remove it from the list of open channels 125 }; 126 this.open.set(id, channel); 127 128 // Initialize the channel in the background 129 this.initChannel(channel).catch((err) => { 130 if (channel) { 131 channel.currentStatus.state = LiveChannelConnectionState.Invalid; 132 channel.shutdownWithError(err); 133 } 134 this.open.delete(id); 135 }); 136 137 // return the not-yet initalized channel 138 return channel; 139 } 140 141 private async initChannel(channel: CentrifugeLiveChannel): Promise<void> { 142 const events = channel.initalize(); 143 if (!this.centrifuge.isConnected()) { 144 await this.connectionBlocker; 145 } 146 channel.subscription = this.centrifuge.subscribe(channel.id, events); 147 return; 148 } 149 150 //---------------------------------------------------------- 151 // Exported functions 152 //---------------------------------------------------------- 153 154 /** 155 * Listen for changes to the connection state 156 */ 157 getConnectionState() { 158 return this.connectionState.asObservable(); 159 } 160 161 /** 162 * Watch for messages in a channel 163 */ 164 getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> { 165 return this.getChannel<T>(address).getStream(); 166 } 167 168 /** 169 * Connect to a channel and return results as DataFrames 170 */ 171 getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> { 172 return new Observable<DataQueryResponse>((subscriber) => { 173 const channel = this.getChannel(options.addr); 174 const key = options.key ?? `xstr/${streamCounter++}`; 175 let data: StreamingDataFrame | undefined = undefined; 176 let filtered: DataFrame | undefined = undefined; 177 let state = LoadingState.Streaming; 178 let lastWidth = -1; 179 180 const process = (msg: DataFrameJSON) => { 181 if (!data) { 182 data = new StreamingDataFrame(msg, options.buffer); 183 } else { 184 data.push(msg); 185 } 186 state = LoadingState.Streaming; 187 const sameWidth = lastWidth === data.fields.length; 188 lastWidth = data.fields.length; 189 190 // Filter out fields 191 if (!filtered || msg.schema || !sameWidth) { 192 filtered = data; 193 if (options.filter) { 194 const { fields } = options.filter; 195 if (fields?.length) { 196 filtered = { 197 ...data, 198 fields: data.fields.filter((f) => fields.includes(f.name)), 199 }; 200 } 201 } 202 } 203 204 if (this.dataStreamSubscriberReady) { 205 filtered.length = data.length; // make sure they stay up-to-date 206 subscriber.next({ 207 state, 208 data: [ 209 // workaround for serializing issues when sending DataFrame from web worker to the main thread 210 // DataFrame is making use of ArrayVectors which are es6 classes and thus not cloneable out of the box 211 // `toDataFrameDTO` converts ArrayVectors into native arrays. 212 toDataFrameDTO(filtered), 213 ], 214 key, 215 }); 216 } 217 }; 218 219 if (options.frame) { 220 process(dataFrameToJSON(options.frame)); 221 } else if (channel.lastMessageWithSchema) { 222 process(channel.lastMessageWithSchema); 223 } 224 225 const sub = channel.getStream().subscribe({ 226 error: (err: any) => { 227 console.log('LiveQuery [error]', { err }, options.addr); 228 state = LoadingState.Error; 229 subscriber.next({ state, data: [data], key, error: toDataQueryError(err) }); 230 sub.unsubscribe(); // close after error 231 }, 232 complete: () => { 233 console.log('LiveQuery [complete]', options.addr); 234 if (state !== LoadingState.Error) { 235 state = LoadingState.Done; 236 } 237 // or track errors? subscriber.next({ state, data: [data], key }); 238 subscriber.complete(); 239 sub.unsubscribe(); 240 }, 241 next: (evt: LiveChannelEvent) => { 242 if (isLiveChannelMessageEvent(evt)) { 243 process(evt.message); 244 return; 245 } 246 if (isLiveChannelStatusEvent(evt)) { 247 if (evt.error) { 248 let error = toDataQueryError(evt.error); 249 error.message = `Streaming channel error: ${error.message}`; 250 state = LoadingState.Error; 251 subscriber.next({ state, data: [data], key, error }); 252 return; 253 } else if ( 254 evt.state === LiveChannelConnectionState.Connected || 255 evt.state === LiveChannelConnectionState.Pending 256 ) { 257 if (evt.message) { 258 process(evt.message); 259 } 260 return; 261 } 262 console.log('ignore state', evt); 263 } 264 }, 265 }); 266 267 return () => { 268 sub.unsubscribe(); 269 }; 270 }); 271 } 272 273 /** 274 * For channels that support presence, this will request the current state from the server. 275 * 276 * Join and leave messages will be sent to the open stream 277 */ 278 getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> { 279 return this.getChannel(address).getPresence(); 280 } 281} 282 283// This is used to give a unique key for each stream. The actual value does not matter 284let streamCounter = 0; 285