1package protocol
2
3import (
4	"bytes"
5	"encoding/binary"
6	"errors"
7	"fmt"
8
9	fastJSON "github.com/segmentio/encoding/json"
10)
11
12var errInvalidJSON = errors.New("invalid JSON data")
13
14// checks that JSON is valid.
15func isValidJSON(b []byte) error {
16	if b == nil {
17		return nil
18	}
19	if !fastJSON.Valid(b) {
20		return errInvalidJSON
21	}
22	return nil
23}
24
25// PushEncoder ...
26type PushEncoder interface {
27	Encode(*Push) ([]byte, error)
28	EncodeMessage(*Message, ...[]byte) ([]byte, error)
29	EncodePublication(*Publication, ...[]byte) ([]byte, error)
30	EncodeJoin(*Join, ...[]byte) ([]byte, error)
31	EncodeLeave(*Leave, ...[]byte) ([]byte, error)
32	EncodeUnsubscribe(*Unsubscribe, ...[]byte) ([]byte, error)
33	EncodeSubscribe(*Subscribe, ...[]byte) ([]byte, error)
34	EncodeConnect(*Connect, ...[]byte) ([]byte, error)
35	EncodeDisconnect(*Disconnect, ...[]byte) ([]byte, error)
36	EncodeRefresh(*Refresh, ...[]byte) ([]byte, error)
37}
38
39var _ PushEncoder = (*JSONPushEncoder)(nil)
40var _ PushEncoder = (*ProtobufPushEncoder)(nil)
41
42// JSONPushEncoder ...
43type JSONPushEncoder struct {
44}
45
46// NewJSONPushEncoder ...
47func NewJSONPushEncoder() *JSONPushEncoder {
48	return &JSONPushEncoder{}
49}
50
51// Encode Push to bytes.
52func (e *JSONPushEncoder) Encode(message *Push) ([]byte, error) {
53	// Check data is valid JSON.
54	if err := isValidJSON(message.Data); err != nil {
55		return nil, err
56	}
57	jw := newWriter()
58	message.MarshalEasyJSON(jw)
59	return jw.BuildBytes()
60}
61
62// EncodePublication to bytes.
63func (e *JSONPushEncoder) EncodePublication(message *Publication, reuse ...[]byte) ([]byte, error) {
64	jw := newWriter()
65	message.MarshalEasyJSON(jw)
66	return jw.BuildBytes(reuse...)
67}
68
69// EncodeMessage to bytes.
70func (e *JSONPushEncoder) EncodeMessage(message *Message, reuse ...[]byte) ([]byte, error) {
71	jw := newWriter()
72	message.MarshalEasyJSON(jw)
73	return jw.BuildBytes(reuse...)
74}
75
76// EncodeJoin to bytes.
77func (e *JSONPushEncoder) EncodeJoin(message *Join, reuse ...[]byte) ([]byte, error) {
78	jw := newWriter()
79	message.MarshalEasyJSON(jw)
80	return jw.BuildBytes(reuse...)
81}
82
83// EncodeLeave to bytes.
84func (e *JSONPushEncoder) EncodeLeave(message *Leave, reuse ...[]byte) ([]byte, error) {
85	jw := newWriter()
86	message.MarshalEasyJSON(jw)
87	return jw.BuildBytes(reuse...)
88}
89
90// EncodeUnsubscribe to bytes.
91func (e *JSONPushEncoder) EncodeUnsubscribe(message *Unsubscribe, reuse ...[]byte) ([]byte, error) {
92	jw := newWriter()
93	message.MarshalEasyJSON(jw)
94	return jw.BuildBytes(reuse...)
95}
96
97// EncodeSubscribe to bytes.
98func (e *JSONPushEncoder) EncodeSubscribe(message *Subscribe, reuse ...[]byte) ([]byte, error) {
99	jw := newWriter()
100	message.MarshalEasyJSON(jw)
101	return jw.BuildBytes(reuse...)
102}
103
104// EncodeConnect to bytes.
105func (e *JSONPushEncoder) EncodeConnect(message *Connect, reuse ...[]byte) ([]byte, error) {
106	jw := newWriter()
107	message.MarshalEasyJSON(jw)
108	return jw.BuildBytes(reuse...)
109}
110
111// EncodeDisconnect to bytes.
112func (e *JSONPushEncoder) EncodeDisconnect(message *Disconnect, reuse ...[]byte) ([]byte, error) {
113	jw := newWriter()
114	message.MarshalEasyJSON(jw)
115	return jw.BuildBytes(reuse...)
116}
117
118// EncodeRefresh to bytes.
119func (e *JSONPushEncoder) EncodeRefresh(message *Refresh, reuse ...[]byte) ([]byte, error) {
120	jw := newWriter()
121	message.MarshalEasyJSON(jw)
122	return jw.BuildBytes(reuse...)
123}
124
125// ProtobufPushEncoder ...
126type ProtobufPushEncoder struct {
127}
128
129// NewProtobufPushEncoder ...
130func NewProtobufPushEncoder() *ProtobufPushEncoder {
131	return &ProtobufPushEncoder{}
132}
133
134// Encode Push to bytes.
135func (e *ProtobufPushEncoder) Encode(message *Push) ([]byte, error) {
136	return message.MarshalVT()
137}
138
139// EncodePublication to bytes.
140func (e *ProtobufPushEncoder) EncodePublication(message *Publication, reuse ...[]byte) ([]byte, error) {
141	if len(reuse) == 1 {
142		size := message.SizeVT()
143		if cap(reuse[0]) >= size {
144			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
145			if err != nil {
146				return nil, err
147			}
148			return reuse[0][:n], nil
149		}
150	}
151	return message.MarshalVT()
152}
153
154// EncodeMessage to bytes.
155func (e *ProtobufPushEncoder) EncodeMessage(message *Message, reuse ...[]byte) ([]byte, error) {
156	if len(reuse) == 1 {
157		size := message.SizeVT()
158		if cap(reuse[0]) >= size {
159			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
160			if err != nil {
161				return nil, err
162			}
163			return reuse[0][:n], nil
164		}
165	}
166	return message.MarshalVT()
167}
168
169// EncodeJoin to bytes.
170func (e *ProtobufPushEncoder) EncodeJoin(message *Join, reuse ...[]byte) ([]byte, error) {
171	if len(reuse) == 1 {
172		size := message.SizeVT()
173		if cap(reuse[0]) >= size {
174			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
175			if err != nil {
176				return nil, err
177			}
178			return reuse[0][:n], nil
179		}
180	}
181	return message.MarshalVT()
182}
183
184// EncodeLeave to bytes.
185func (e *ProtobufPushEncoder) EncodeLeave(message *Leave, reuse ...[]byte) ([]byte, error) {
186	if len(reuse) == 1 {
187		size := message.SizeVT()
188		if cap(reuse[0]) >= size {
189			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
190			if err != nil {
191				return nil, err
192			}
193			return reuse[0][:n], nil
194		}
195	}
196	return message.MarshalVT()
197}
198
199// EncodeUnsubscribe to bytes.
200func (e *ProtobufPushEncoder) EncodeUnsubscribe(message *Unsubscribe, reuse ...[]byte) ([]byte, error) {
201	if len(reuse) == 1 {
202		size := message.SizeVT()
203		if cap(reuse[0]) >= size {
204			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
205			if err != nil {
206				return nil, err
207			}
208			return reuse[0][:n], nil
209		}
210	}
211	return message.MarshalVT()
212}
213
214// EncodeSubscribe to bytes.
215func (e *ProtobufPushEncoder) EncodeSubscribe(message *Subscribe, reuse ...[]byte) ([]byte, error) {
216	if len(reuse) == 1 {
217		size := message.SizeVT()
218		if cap(reuse[0]) >= size {
219			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
220			if err != nil {
221				return nil, err
222			}
223			return reuse[0][:n], nil
224		}
225	}
226	return message.MarshalVT()
227}
228
229// EncodeConnect to bytes.
230func (e *ProtobufPushEncoder) EncodeConnect(message *Connect, reuse ...[]byte) ([]byte, error) {
231	if len(reuse) == 1 {
232		size := message.SizeVT()
233		if cap(reuse[0]) >= size {
234			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
235			if err != nil {
236				return nil, err
237			}
238			return reuse[0][:n], nil
239		}
240	}
241	return message.MarshalVT()
242}
243
244// EncodeDisconnect to bytes.
245func (e *ProtobufPushEncoder) EncodeDisconnect(message *Disconnect, reuse ...[]byte) ([]byte, error) {
246	if len(reuse) == 1 {
247		size := message.SizeVT()
248		if cap(reuse[0]) >= size {
249			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
250			if err != nil {
251				return nil, err
252			}
253			return reuse[0][:n], nil
254		}
255	}
256	return message.MarshalVT()
257}
258
259// EncodeRefresh to bytes.
260func (e *ProtobufPushEncoder) EncodeRefresh(message *Refresh, reuse ...[]byte) ([]byte, error) {
261	if len(reuse) == 1 {
262		size := message.SizeVT()
263		if cap(reuse[0]) >= size {
264			n, err := message.MarshalToSizedBufferVT(reuse[0][:size])
265			if err != nil {
266				return nil, err
267			}
268			return reuse[0][:n], nil
269		}
270	}
271	return message.MarshalVT()
272}
273
274// ReplyEncoder ...
275type ReplyEncoder interface {
276	Encode(*Reply) ([]byte, error)
277}
278
279// JSONReplyEncoder ...
280type JSONReplyEncoder struct{}
281
282// NewJSONReplyEncoder ...
283func NewJSONReplyEncoder() *JSONReplyEncoder {
284	return &JSONReplyEncoder{}
285}
286
287// Encode Reply to bytes.
288func (e *JSONReplyEncoder) Encode(r *Reply) ([]byte, error) {
289	if r.Id != 0 {
290		// Only check command result reply. Push reply JSON validation is done in PushEncoder.
291		if err := isValidJSON(r.Result); err != nil {
292			return nil, err
293		}
294	}
295	jw := newWriter()
296	r.MarshalEasyJSON(jw)
297	return jw.BuildBytes()
298}
299
300// ProtobufReplyEncoder ...
301type ProtobufReplyEncoder struct{}
302
303// NewProtobufReplyEncoder ...
304func NewProtobufReplyEncoder() *ProtobufReplyEncoder {
305	return &ProtobufReplyEncoder{}
306}
307
308// Encode Reply to bytes.
309func (e *ProtobufReplyEncoder) Encode(r *Reply) ([]byte, error) {
310	return r.MarshalVT()
311}
312
313// DataEncoder ...
314type DataEncoder interface {
315	Reset()
316	Encode([]byte) error
317	Finish() []byte
318}
319
320// JSONDataEncoder ...
321type JSONDataEncoder struct {
322	count  int
323	buffer bytes.Buffer
324}
325
326// NewJSONDataEncoder ...
327func NewJSONDataEncoder() *JSONDataEncoder {
328	return &JSONDataEncoder{}
329}
330
331// Reset ...
332func (e *JSONDataEncoder) Reset() {
333	e.count = 0
334	e.buffer.Reset()
335}
336
337// Encode ...
338func (e *JSONDataEncoder) Encode(data []byte) error {
339	if e.count > 0 {
340		e.buffer.WriteString("\n")
341	}
342	e.buffer.Write(data)
343	e.count++
344	return nil
345}
346
347// Finish ...
348func (e *JSONDataEncoder) Finish() []byte {
349	data := e.buffer.Bytes()
350	dataCopy := make([]byte, len(data))
351	copy(dataCopy, data)
352	return dataCopy
353}
354
355// ProtobufDataEncoder ...
356type ProtobufDataEncoder struct {
357	buffer bytes.Buffer
358}
359
360// NewProtobufDataEncoder ...
361func NewProtobufDataEncoder() *ProtobufDataEncoder {
362	return &ProtobufDataEncoder{}
363}
364
365// Encode ...
366func (e *ProtobufDataEncoder) Encode(data []byte) error {
367	bs := make([]byte, 8)
368	n := binary.PutUvarint(bs, uint64(len(data)))
369	e.buffer.Write(bs[:n])
370	e.buffer.Write(data)
371	return nil
372}
373
374// Reset ...
375func (e *ProtobufDataEncoder) Reset() {
376	e.buffer.Reset()
377}
378
379// Finish ...
380func (e *ProtobufDataEncoder) Finish() []byte {
381	data := e.buffer.Bytes()
382	dataCopy := make([]byte, len(data))
383	copy(dataCopy, data)
384	return dataCopy
385}
386
387// ResultEncoder ...
388type ResultEncoder interface {
389	EncodeConnectResult(*ConnectResult) ([]byte, error)
390	EncodeRefreshResult(*RefreshResult) ([]byte, error)
391	EncodeSubscribeResult(*SubscribeResult) ([]byte, error)
392	EncodeSubRefreshResult(*SubRefreshResult) ([]byte, error)
393	EncodeUnsubscribeResult(*UnsubscribeResult) ([]byte, error)
394	EncodePublishResult(*PublishResult) ([]byte, error)
395	EncodePresenceResult(*PresenceResult) ([]byte, error)
396	EncodePresenceStatsResult(*PresenceStatsResult) ([]byte, error)
397	EncodeHistoryResult(*HistoryResult) ([]byte, error)
398	EncodePingResult(*PingResult) ([]byte, error)
399	EncodeRPCResult(*RPCResult) ([]byte, error)
400}
401
402// JSONResultEncoder ...
403type JSONResultEncoder struct{}
404
405// NewJSONResultEncoder ...
406func NewJSONResultEncoder() *JSONResultEncoder {
407	return &JSONResultEncoder{}
408}
409
410// EncodeConnectResult ...
411func (e *JSONResultEncoder) EncodeConnectResult(res *ConnectResult) ([]byte, error) {
412	jw := newWriter()
413	res.MarshalEasyJSON(jw)
414	return jw.BuildBytes()
415}
416
417// EncodeRefreshResult ...
418func (e *JSONResultEncoder) EncodeRefreshResult(res *RefreshResult) ([]byte, error) {
419	jw := newWriter()
420	res.MarshalEasyJSON(jw)
421	return jw.BuildBytes()
422}
423
424// EncodeSubscribeResult ...
425func (e *JSONResultEncoder) EncodeSubscribeResult(res *SubscribeResult) ([]byte, error) {
426	jw := newWriter()
427	res.MarshalEasyJSON(jw)
428	return jw.BuildBytes()
429}
430
431// EncodeSubRefreshResult ...
432func (e *JSONResultEncoder) EncodeSubRefreshResult(res *SubRefreshResult) ([]byte, error) {
433	jw := newWriter()
434	res.MarshalEasyJSON(jw)
435	return jw.BuildBytes()
436}
437
438// EncodeUnsubscribeResult ...
439func (e *JSONResultEncoder) EncodeUnsubscribeResult(res *UnsubscribeResult) ([]byte, error) {
440	jw := newWriter()
441	res.MarshalEasyJSON(jw)
442	return jw.BuildBytes()
443}
444
445// EncodePublishResult ...
446func (e *JSONResultEncoder) EncodePublishResult(res *PublishResult) ([]byte, error) {
447	jw := newWriter()
448	res.MarshalEasyJSON(jw)
449	return jw.BuildBytes()
450}
451
452// EncodePresenceResult ...
453func (e *JSONResultEncoder) EncodePresenceResult(res *PresenceResult) ([]byte, error) {
454	jw := newWriter()
455	res.MarshalEasyJSON(jw)
456	return jw.BuildBytes()
457}
458
459// EncodePresenceStatsResult ...
460func (e *JSONResultEncoder) EncodePresenceStatsResult(res *PresenceStatsResult) ([]byte, error) {
461	jw := newWriter()
462	res.MarshalEasyJSON(jw)
463	return jw.BuildBytes()
464}
465
466// EncodeHistoryResult ...
467func (e *JSONResultEncoder) EncodeHistoryResult(res *HistoryResult) ([]byte, error) {
468	jw := newWriter()
469	res.MarshalEasyJSON(jw)
470	return jw.BuildBytes()
471}
472
473// EncodePingResult ...
474func (e *JSONResultEncoder) EncodePingResult(res *PingResult) ([]byte, error) {
475	jw := newWriter()
476	res.MarshalEasyJSON(jw)
477	return jw.BuildBytes()
478}
479
480// EncodeRPCResult ...
481func (e *JSONResultEncoder) EncodeRPCResult(res *RPCResult) ([]byte, error) {
482	jw := newWriter()
483	res.MarshalEasyJSON(jw)
484	return jw.BuildBytes()
485}
486
487// ProtobufResultEncoder ...
488type ProtobufResultEncoder struct{}
489
490// NewProtobufResultEncoder ...
491func NewProtobufResultEncoder() *ProtobufResultEncoder {
492	return &ProtobufResultEncoder{}
493}
494
495// EncodeConnectResult ...
496func (e *ProtobufResultEncoder) EncodeConnectResult(res *ConnectResult) ([]byte, error) {
497	return res.MarshalVT()
498}
499
500// EncodeRefreshResult ...
501func (e *ProtobufResultEncoder) EncodeRefreshResult(res *RefreshResult) ([]byte, error) {
502	return res.MarshalVT()
503}
504
505// EncodeSubscribeResult ...
506func (e *ProtobufResultEncoder) EncodeSubscribeResult(res *SubscribeResult) ([]byte, error) {
507	return res.MarshalVT()
508}
509
510// EncodeSubRefreshResult ...
511func (e *ProtobufResultEncoder) EncodeSubRefreshResult(res *SubRefreshResult) ([]byte, error) {
512	return res.MarshalVT()
513}
514
515// EncodeUnsubscribeResult ...
516func (e *ProtobufResultEncoder) EncodeUnsubscribeResult(res *UnsubscribeResult) ([]byte, error) {
517	return res.MarshalVT()
518}
519
520// EncodePublishResult ...
521func (e *ProtobufResultEncoder) EncodePublishResult(res *PublishResult) ([]byte, error) {
522	return res.MarshalVT()
523}
524
525// EncodePresenceResult ...
526func (e *ProtobufResultEncoder) EncodePresenceResult(res *PresenceResult) ([]byte, error) {
527	return res.MarshalVT()
528}
529
530// EncodePresenceStatsResult ...
531func (e *ProtobufResultEncoder) EncodePresenceStatsResult(res *PresenceStatsResult) ([]byte, error) {
532	return res.MarshalVT()
533}
534
535// EncodeHistoryResult ...
536func (e *ProtobufResultEncoder) EncodeHistoryResult(res *HistoryResult) ([]byte, error) {
537	return res.MarshalVT()
538}
539
540// EncodePingResult ...
541func (e *ProtobufResultEncoder) EncodePingResult(res *PingResult) ([]byte, error) {
542	return res.MarshalVT()
543}
544
545// EncodeRPCResult ...
546func (e *ProtobufResultEncoder) EncodeRPCResult(res *RPCResult) ([]byte, error) {
547	return res.MarshalVT()
548}
549
550// CommandEncoder ...
551type CommandEncoder interface {
552	Encode(cmd *Command) ([]byte, error)
553}
554
555// JSONCommandEncoder ...
556type JSONCommandEncoder struct {
557}
558
559// NewJSONCommandEncoder ...
560func NewJSONCommandEncoder() *JSONCommandEncoder {
561	return &JSONCommandEncoder{}
562}
563
564// Encode ...
565func (e *JSONCommandEncoder) Encode(cmd *Command) ([]byte, error) {
566	jw := newWriter()
567	cmd.MarshalEasyJSON(jw)
568	return jw.BuildBytes()
569}
570
571// ProtobufCommandEncoder ...
572type ProtobufCommandEncoder struct {
573}
574
575// NewProtobufCommandEncoder ...
576func NewProtobufCommandEncoder() *ProtobufCommandEncoder {
577	return &ProtobufCommandEncoder{}
578}
579
580// Encode ...
581func (e *ProtobufCommandEncoder) Encode(cmd *Command) ([]byte, error) {
582	commandBytes, err := cmd.MarshalVT()
583	if err != nil {
584		return nil, err
585	}
586	bs := make([]byte, 8)
587	n := binary.PutUvarint(bs, uint64(len(commandBytes)))
588	var buf bytes.Buffer
589	buf.Write(bs[:n])
590	buf.Write(commandBytes)
591	return buf.Bytes(), nil
592}
593
594// ParamsEncoder ...
595type ParamsEncoder interface {
596	Encode(request interface{}) ([]byte, error)
597}
598
599var _ ParamsEncoder = NewJSONParamsEncoder()
600
601// JSONParamsEncoder ...
602type JSONParamsEncoder struct{}
603
604// NewJSONParamsEncoder ...
605func NewJSONParamsEncoder() *JSONParamsEncoder {
606	return &JSONParamsEncoder{}
607}
608
609// Encode ...
610func (d *JSONParamsEncoder) Encode(r interface{}) ([]byte, error) {
611	return fastJSON.Marshal(r)
612}
613
614var _ ParamsEncoder = NewProtobufParamsEncoder()
615
616// ProtobufParamsEncoder ...
617type ProtobufParamsEncoder struct{}
618
619// NewProtobufParamsEncoder ...
620func NewProtobufParamsEncoder() *ProtobufParamsEncoder {
621	return &ProtobufParamsEncoder{}
622}
623
624type vtMarshaler interface {
625	MarshalVT() (dAtA []byte, err error)
626}
627
628// Encode ...
629func (d *ProtobufParamsEncoder) Encode(r interface{}) ([]byte, error) {
630	m, ok := r.(vtMarshaler)
631	if !ok {
632		return nil, fmt.Errorf("can not marshal type %T to Protobuf", r)
633	}
634	return m.MarshalVT()
635}
636