1package protocol 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "errors" 7 "fmt" 8 9 fastJSON "github.com/segmentio/encoding/json" 10) 11 12var errInvalidJSON = errors.New("invalid JSON data") 13 14// checks that JSON is valid. 15func isValidJSON(b []byte) error { 16 if b == nil { 17 return nil 18 } 19 if !fastJSON.Valid(b) { 20 return errInvalidJSON 21 } 22 return nil 23} 24 25// PushEncoder ... 26type PushEncoder interface { 27 Encode(*Push) ([]byte, error) 28 EncodeMessage(*Message, ...[]byte) ([]byte, error) 29 EncodePublication(*Publication, ...[]byte) ([]byte, error) 30 EncodeJoin(*Join, ...[]byte) ([]byte, error) 31 EncodeLeave(*Leave, ...[]byte) ([]byte, error) 32 EncodeUnsubscribe(*Unsubscribe, ...[]byte) ([]byte, error) 33 EncodeSubscribe(*Subscribe, ...[]byte) ([]byte, error) 34 EncodeConnect(*Connect, ...[]byte) ([]byte, error) 35 EncodeDisconnect(*Disconnect, ...[]byte) ([]byte, error) 36 EncodeRefresh(*Refresh, ...[]byte) ([]byte, error) 37} 38 39var _ PushEncoder = (*JSONPushEncoder)(nil) 40var _ PushEncoder = (*ProtobufPushEncoder)(nil) 41 42// JSONPushEncoder ... 43type JSONPushEncoder struct { 44} 45 46// NewJSONPushEncoder ... 47func NewJSONPushEncoder() *JSONPushEncoder { 48 return &JSONPushEncoder{} 49} 50 51// Encode Push to bytes. 52func (e *JSONPushEncoder) Encode(message *Push) ([]byte, error) { 53 // Check data is valid JSON. 54 if err := isValidJSON(message.Data); err != nil { 55 return nil, err 56 } 57 jw := newWriter() 58 message.MarshalEasyJSON(jw) 59 return jw.BuildBytes() 60} 61 62// EncodePublication to bytes. 63func (e *JSONPushEncoder) EncodePublication(message *Publication, reuse ...[]byte) ([]byte, error) { 64 jw := newWriter() 65 message.MarshalEasyJSON(jw) 66 return jw.BuildBytes(reuse...) 67} 68 69// EncodeMessage to bytes. 70func (e *JSONPushEncoder) EncodeMessage(message *Message, reuse ...[]byte) ([]byte, error) { 71 jw := newWriter() 72 message.MarshalEasyJSON(jw) 73 return jw.BuildBytes(reuse...) 74} 75 76// EncodeJoin to bytes. 77func (e *JSONPushEncoder) EncodeJoin(message *Join, reuse ...[]byte) ([]byte, error) { 78 jw := newWriter() 79 message.MarshalEasyJSON(jw) 80 return jw.BuildBytes(reuse...) 81} 82 83// EncodeLeave to bytes. 84func (e *JSONPushEncoder) EncodeLeave(message *Leave, reuse ...[]byte) ([]byte, error) { 85 jw := newWriter() 86 message.MarshalEasyJSON(jw) 87 return jw.BuildBytes(reuse...) 88} 89 90// EncodeUnsubscribe to bytes. 91func (e *JSONPushEncoder) EncodeUnsubscribe(message *Unsubscribe, reuse ...[]byte) ([]byte, error) { 92 jw := newWriter() 93 message.MarshalEasyJSON(jw) 94 return jw.BuildBytes(reuse...) 95} 96 97// EncodeSubscribe to bytes. 98func (e *JSONPushEncoder) EncodeSubscribe(message *Subscribe, reuse ...[]byte) ([]byte, error) { 99 jw := newWriter() 100 message.MarshalEasyJSON(jw) 101 return jw.BuildBytes(reuse...) 102} 103 104// EncodeConnect to bytes. 105func (e *JSONPushEncoder) EncodeConnect(message *Connect, reuse ...[]byte) ([]byte, error) { 106 jw := newWriter() 107 message.MarshalEasyJSON(jw) 108 return jw.BuildBytes(reuse...) 109} 110 111// EncodeDisconnect to bytes. 112func (e *JSONPushEncoder) EncodeDisconnect(message *Disconnect, reuse ...[]byte) ([]byte, error) { 113 jw := newWriter() 114 message.MarshalEasyJSON(jw) 115 return jw.BuildBytes(reuse...) 116} 117 118// EncodeRefresh to bytes. 119func (e *JSONPushEncoder) EncodeRefresh(message *Refresh, reuse ...[]byte) ([]byte, error) { 120 jw := newWriter() 121 message.MarshalEasyJSON(jw) 122 return jw.BuildBytes(reuse...) 123} 124 125// ProtobufPushEncoder ... 126type ProtobufPushEncoder struct { 127} 128 129// NewProtobufPushEncoder ... 130func NewProtobufPushEncoder() *ProtobufPushEncoder { 131 return &ProtobufPushEncoder{} 132} 133 134// Encode Push to bytes. 135func (e *ProtobufPushEncoder) Encode(message *Push) ([]byte, error) { 136 return message.MarshalVT() 137} 138 139// EncodePublication to bytes. 140func (e *ProtobufPushEncoder) EncodePublication(message *Publication, reuse ...[]byte) ([]byte, error) { 141 if len(reuse) == 1 { 142 size := message.SizeVT() 143 if cap(reuse[0]) >= size { 144 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 145 if err != nil { 146 return nil, err 147 } 148 return reuse[0][:n], nil 149 } 150 } 151 return message.MarshalVT() 152} 153 154// EncodeMessage to bytes. 155func (e *ProtobufPushEncoder) EncodeMessage(message *Message, reuse ...[]byte) ([]byte, error) { 156 if len(reuse) == 1 { 157 size := message.SizeVT() 158 if cap(reuse[0]) >= size { 159 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 160 if err != nil { 161 return nil, err 162 } 163 return reuse[0][:n], nil 164 } 165 } 166 return message.MarshalVT() 167} 168 169// EncodeJoin to bytes. 170func (e *ProtobufPushEncoder) EncodeJoin(message *Join, reuse ...[]byte) ([]byte, error) { 171 if len(reuse) == 1 { 172 size := message.SizeVT() 173 if cap(reuse[0]) >= size { 174 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 175 if err != nil { 176 return nil, err 177 } 178 return reuse[0][:n], nil 179 } 180 } 181 return message.MarshalVT() 182} 183 184// EncodeLeave to bytes. 185func (e *ProtobufPushEncoder) EncodeLeave(message *Leave, reuse ...[]byte) ([]byte, error) { 186 if len(reuse) == 1 { 187 size := message.SizeVT() 188 if cap(reuse[0]) >= size { 189 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 190 if err != nil { 191 return nil, err 192 } 193 return reuse[0][:n], nil 194 } 195 } 196 return message.MarshalVT() 197} 198 199// EncodeUnsubscribe to bytes. 200func (e *ProtobufPushEncoder) EncodeUnsubscribe(message *Unsubscribe, reuse ...[]byte) ([]byte, error) { 201 if len(reuse) == 1 { 202 size := message.SizeVT() 203 if cap(reuse[0]) >= size { 204 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 205 if err != nil { 206 return nil, err 207 } 208 return reuse[0][:n], nil 209 } 210 } 211 return message.MarshalVT() 212} 213 214// EncodeSubscribe to bytes. 215func (e *ProtobufPushEncoder) EncodeSubscribe(message *Subscribe, reuse ...[]byte) ([]byte, error) { 216 if len(reuse) == 1 { 217 size := message.SizeVT() 218 if cap(reuse[0]) >= size { 219 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 220 if err != nil { 221 return nil, err 222 } 223 return reuse[0][:n], nil 224 } 225 } 226 return message.MarshalVT() 227} 228 229// EncodeConnect to bytes. 230func (e *ProtobufPushEncoder) EncodeConnect(message *Connect, reuse ...[]byte) ([]byte, error) { 231 if len(reuse) == 1 { 232 size := message.SizeVT() 233 if cap(reuse[0]) >= size { 234 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 235 if err != nil { 236 return nil, err 237 } 238 return reuse[0][:n], nil 239 } 240 } 241 return message.MarshalVT() 242} 243 244// EncodeDisconnect to bytes. 245func (e *ProtobufPushEncoder) EncodeDisconnect(message *Disconnect, reuse ...[]byte) ([]byte, error) { 246 if len(reuse) == 1 { 247 size := message.SizeVT() 248 if cap(reuse[0]) >= size { 249 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 250 if err != nil { 251 return nil, err 252 } 253 return reuse[0][:n], nil 254 } 255 } 256 return message.MarshalVT() 257} 258 259// EncodeRefresh to bytes. 260func (e *ProtobufPushEncoder) EncodeRefresh(message *Refresh, reuse ...[]byte) ([]byte, error) { 261 if len(reuse) == 1 { 262 size := message.SizeVT() 263 if cap(reuse[0]) >= size { 264 n, err := message.MarshalToSizedBufferVT(reuse[0][:size]) 265 if err != nil { 266 return nil, err 267 } 268 return reuse[0][:n], nil 269 } 270 } 271 return message.MarshalVT() 272} 273 274// ReplyEncoder ... 275type ReplyEncoder interface { 276 Encode(*Reply) ([]byte, error) 277} 278 279// JSONReplyEncoder ... 280type JSONReplyEncoder struct{} 281 282// NewJSONReplyEncoder ... 283func NewJSONReplyEncoder() *JSONReplyEncoder { 284 return &JSONReplyEncoder{} 285} 286 287// Encode Reply to bytes. 288func (e *JSONReplyEncoder) Encode(r *Reply) ([]byte, error) { 289 if r.Id != 0 { 290 // Only check command result reply. Push reply JSON validation is done in PushEncoder. 291 if err := isValidJSON(r.Result); err != nil { 292 return nil, err 293 } 294 } 295 jw := newWriter() 296 r.MarshalEasyJSON(jw) 297 return jw.BuildBytes() 298} 299 300// ProtobufReplyEncoder ... 301type ProtobufReplyEncoder struct{} 302 303// NewProtobufReplyEncoder ... 304func NewProtobufReplyEncoder() *ProtobufReplyEncoder { 305 return &ProtobufReplyEncoder{} 306} 307 308// Encode Reply to bytes. 309func (e *ProtobufReplyEncoder) Encode(r *Reply) ([]byte, error) { 310 return r.MarshalVT() 311} 312 313// DataEncoder ... 314type DataEncoder interface { 315 Reset() 316 Encode([]byte) error 317 Finish() []byte 318} 319 320// JSONDataEncoder ... 321type JSONDataEncoder struct { 322 count int 323 buffer bytes.Buffer 324} 325 326// NewJSONDataEncoder ... 327func NewJSONDataEncoder() *JSONDataEncoder { 328 return &JSONDataEncoder{} 329} 330 331// Reset ... 332func (e *JSONDataEncoder) Reset() { 333 e.count = 0 334 e.buffer.Reset() 335} 336 337// Encode ... 338func (e *JSONDataEncoder) Encode(data []byte) error { 339 if e.count > 0 { 340 e.buffer.WriteString("\n") 341 } 342 e.buffer.Write(data) 343 e.count++ 344 return nil 345} 346 347// Finish ... 348func (e *JSONDataEncoder) Finish() []byte { 349 data := e.buffer.Bytes() 350 dataCopy := make([]byte, len(data)) 351 copy(dataCopy, data) 352 return dataCopy 353} 354 355// ProtobufDataEncoder ... 356type ProtobufDataEncoder struct { 357 buffer bytes.Buffer 358} 359 360// NewProtobufDataEncoder ... 361func NewProtobufDataEncoder() *ProtobufDataEncoder { 362 return &ProtobufDataEncoder{} 363} 364 365// Encode ... 366func (e *ProtobufDataEncoder) Encode(data []byte) error { 367 bs := make([]byte, 8) 368 n := binary.PutUvarint(bs, uint64(len(data))) 369 e.buffer.Write(bs[:n]) 370 e.buffer.Write(data) 371 return nil 372} 373 374// Reset ... 375func (e *ProtobufDataEncoder) Reset() { 376 e.buffer.Reset() 377} 378 379// Finish ... 380func (e *ProtobufDataEncoder) Finish() []byte { 381 data := e.buffer.Bytes() 382 dataCopy := make([]byte, len(data)) 383 copy(dataCopy, data) 384 return dataCopy 385} 386 387// ResultEncoder ... 388type ResultEncoder interface { 389 EncodeConnectResult(*ConnectResult) ([]byte, error) 390 EncodeRefreshResult(*RefreshResult) ([]byte, error) 391 EncodeSubscribeResult(*SubscribeResult) ([]byte, error) 392 EncodeSubRefreshResult(*SubRefreshResult) ([]byte, error) 393 EncodeUnsubscribeResult(*UnsubscribeResult) ([]byte, error) 394 EncodePublishResult(*PublishResult) ([]byte, error) 395 EncodePresenceResult(*PresenceResult) ([]byte, error) 396 EncodePresenceStatsResult(*PresenceStatsResult) ([]byte, error) 397 EncodeHistoryResult(*HistoryResult) ([]byte, error) 398 EncodePingResult(*PingResult) ([]byte, error) 399 EncodeRPCResult(*RPCResult) ([]byte, error) 400} 401 402// JSONResultEncoder ... 403type JSONResultEncoder struct{} 404 405// NewJSONResultEncoder ... 406func NewJSONResultEncoder() *JSONResultEncoder { 407 return &JSONResultEncoder{} 408} 409 410// EncodeConnectResult ... 411func (e *JSONResultEncoder) EncodeConnectResult(res *ConnectResult) ([]byte, error) { 412 jw := newWriter() 413 res.MarshalEasyJSON(jw) 414 return jw.BuildBytes() 415} 416 417// EncodeRefreshResult ... 418func (e *JSONResultEncoder) EncodeRefreshResult(res *RefreshResult) ([]byte, error) { 419 jw := newWriter() 420 res.MarshalEasyJSON(jw) 421 return jw.BuildBytes() 422} 423 424// EncodeSubscribeResult ... 425func (e *JSONResultEncoder) EncodeSubscribeResult(res *SubscribeResult) ([]byte, error) { 426 jw := newWriter() 427 res.MarshalEasyJSON(jw) 428 return jw.BuildBytes() 429} 430 431// EncodeSubRefreshResult ... 432func (e *JSONResultEncoder) EncodeSubRefreshResult(res *SubRefreshResult) ([]byte, error) { 433 jw := newWriter() 434 res.MarshalEasyJSON(jw) 435 return jw.BuildBytes() 436} 437 438// EncodeUnsubscribeResult ... 439func (e *JSONResultEncoder) EncodeUnsubscribeResult(res *UnsubscribeResult) ([]byte, error) { 440 jw := newWriter() 441 res.MarshalEasyJSON(jw) 442 return jw.BuildBytes() 443} 444 445// EncodePublishResult ... 446func (e *JSONResultEncoder) EncodePublishResult(res *PublishResult) ([]byte, error) { 447 jw := newWriter() 448 res.MarshalEasyJSON(jw) 449 return jw.BuildBytes() 450} 451 452// EncodePresenceResult ... 453func (e *JSONResultEncoder) EncodePresenceResult(res *PresenceResult) ([]byte, error) { 454 jw := newWriter() 455 res.MarshalEasyJSON(jw) 456 return jw.BuildBytes() 457} 458 459// EncodePresenceStatsResult ... 460func (e *JSONResultEncoder) EncodePresenceStatsResult(res *PresenceStatsResult) ([]byte, error) { 461 jw := newWriter() 462 res.MarshalEasyJSON(jw) 463 return jw.BuildBytes() 464} 465 466// EncodeHistoryResult ... 467func (e *JSONResultEncoder) EncodeHistoryResult(res *HistoryResult) ([]byte, error) { 468 jw := newWriter() 469 res.MarshalEasyJSON(jw) 470 return jw.BuildBytes() 471} 472 473// EncodePingResult ... 474func (e *JSONResultEncoder) EncodePingResult(res *PingResult) ([]byte, error) { 475 jw := newWriter() 476 res.MarshalEasyJSON(jw) 477 return jw.BuildBytes() 478} 479 480// EncodeRPCResult ... 481func (e *JSONResultEncoder) EncodeRPCResult(res *RPCResult) ([]byte, error) { 482 jw := newWriter() 483 res.MarshalEasyJSON(jw) 484 return jw.BuildBytes() 485} 486 487// ProtobufResultEncoder ... 488type ProtobufResultEncoder struct{} 489 490// NewProtobufResultEncoder ... 491func NewProtobufResultEncoder() *ProtobufResultEncoder { 492 return &ProtobufResultEncoder{} 493} 494 495// EncodeConnectResult ... 496func (e *ProtobufResultEncoder) EncodeConnectResult(res *ConnectResult) ([]byte, error) { 497 return res.MarshalVT() 498} 499 500// EncodeRefreshResult ... 501func (e *ProtobufResultEncoder) EncodeRefreshResult(res *RefreshResult) ([]byte, error) { 502 return res.MarshalVT() 503} 504 505// EncodeSubscribeResult ... 506func (e *ProtobufResultEncoder) EncodeSubscribeResult(res *SubscribeResult) ([]byte, error) { 507 return res.MarshalVT() 508} 509 510// EncodeSubRefreshResult ... 511func (e *ProtobufResultEncoder) EncodeSubRefreshResult(res *SubRefreshResult) ([]byte, error) { 512 return res.MarshalVT() 513} 514 515// EncodeUnsubscribeResult ... 516func (e *ProtobufResultEncoder) EncodeUnsubscribeResult(res *UnsubscribeResult) ([]byte, error) { 517 return res.MarshalVT() 518} 519 520// EncodePublishResult ... 521func (e *ProtobufResultEncoder) EncodePublishResult(res *PublishResult) ([]byte, error) { 522 return res.MarshalVT() 523} 524 525// EncodePresenceResult ... 526func (e *ProtobufResultEncoder) EncodePresenceResult(res *PresenceResult) ([]byte, error) { 527 return res.MarshalVT() 528} 529 530// EncodePresenceStatsResult ... 531func (e *ProtobufResultEncoder) EncodePresenceStatsResult(res *PresenceStatsResult) ([]byte, error) { 532 return res.MarshalVT() 533} 534 535// EncodeHistoryResult ... 536func (e *ProtobufResultEncoder) EncodeHistoryResult(res *HistoryResult) ([]byte, error) { 537 return res.MarshalVT() 538} 539 540// EncodePingResult ... 541func (e *ProtobufResultEncoder) EncodePingResult(res *PingResult) ([]byte, error) { 542 return res.MarshalVT() 543} 544 545// EncodeRPCResult ... 546func (e *ProtobufResultEncoder) EncodeRPCResult(res *RPCResult) ([]byte, error) { 547 return res.MarshalVT() 548} 549 550// CommandEncoder ... 551type CommandEncoder interface { 552 Encode(cmd *Command) ([]byte, error) 553} 554 555// JSONCommandEncoder ... 556type JSONCommandEncoder struct { 557} 558 559// NewJSONCommandEncoder ... 560func NewJSONCommandEncoder() *JSONCommandEncoder { 561 return &JSONCommandEncoder{} 562} 563 564// Encode ... 565func (e *JSONCommandEncoder) Encode(cmd *Command) ([]byte, error) { 566 jw := newWriter() 567 cmd.MarshalEasyJSON(jw) 568 return jw.BuildBytes() 569} 570 571// ProtobufCommandEncoder ... 572type ProtobufCommandEncoder struct { 573} 574 575// NewProtobufCommandEncoder ... 576func NewProtobufCommandEncoder() *ProtobufCommandEncoder { 577 return &ProtobufCommandEncoder{} 578} 579 580// Encode ... 581func (e *ProtobufCommandEncoder) Encode(cmd *Command) ([]byte, error) { 582 commandBytes, err := cmd.MarshalVT() 583 if err != nil { 584 return nil, err 585 } 586 bs := make([]byte, 8) 587 n := binary.PutUvarint(bs, uint64(len(commandBytes))) 588 var buf bytes.Buffer 589 buf.Write(bs[:n]) 590 buf.Write(commandBytes) 591 return buf.Bytes(), nil 592} 593 594// ParamsEncoder ... 595type ParamsEncoder interface { 596 Encode(request interface{}) ([]byte, error) 597} 598 599var _ ParamsEncoder = NewJSONParamsEncoder() 600 601// JSONParamsEncoder ... 602type JSONParamsEncoder struct{} 603 604// NewJSONParamsEncoder ... 605func NewJSONParamsEncoder() *JSONParamsEncoder { 606 return &JSONParamsEncoder{} 607} 608 609// Encode ... 610func (d *JSONParamsEncoder) Encode(r interface{}) ([]byte, error) { 611 return fastJSON.Marshal(r) 612} 613 614var _ ParamsEncoder = NewProtobufParamsEncoder() 615 616// ProtobufParamsEncoder ... 617type ProtobufParamsEncoder struct{} 618 619// NewProtobufParamsEncoder ... 620func NewProtobufParamsEncoder() *ProtobufParamsEncoder { 621 return &ProtobufParamsEncoder{} 622} 623 624type vtMarshaler interface { 625 MarshalVT() (dAtA []byte, err error) 626} 627 628// Encode ... 629func (d *ProtobufParamsEncoder) Encode(r interface{}) ([]byte, error) { 630 m, ok := r.(vtMarshaler) 631 if !ok { 632 return nil, fmt.Errorf("can not marshal type %T to Protobuf", r) 633 } 634 return m.MarshalVT() 635} 636