1// Copyright (c) 2012 The gocql Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package gocql 6 7import ( 8 "context" 9 "errors" 10 "fmt" 11 "io" 12 "io/ioutil" 13 "net" 14 "runtime" 15 "strings" 16 "time" 17) 18 19type unsetColumn struct{} 20 21// UnsetValue represents a value used in a query binding that will be ignored by Cassandra. 22// 23// By setting a field to the unset value Cassandra will ignore the write completely. 24// The main advantage is the ability to keep the same prepared statement even when you don't 25// want to update some fields, where before you needed to make another prepared statement. 26// 27// UnsetValue is only available when using the version 4 of the protocol. 28var UnsetValue = unsetColumn{} 29 30type namedValue struct { 31 name string 32 value interface{} 33} 34 35// NamedValue produce a value which will bind to the named parameter in a query 36func NamedValue(name string, value interface{}) interface{} { 37 return &namedValue{ 38 name: name, 39 value: value, 40 } 41} 42 43const ( 44 protoDirectionMask = 0x80 45 protoVersionMask = 0x7F 46 protoVersion1 = 0x01 47 protoVersion2 = 0x02 48 protoVersion3 = 0x03 49 protoVersion4 = 0x04 50 protoVersion5 = 0x05 51 52 maxFrameSize = 256 * 1024 * 1024 53) 54 55type protoVersion byte 56 57func (p protoVersion) request() bool { 58 return p&protoDirectionMask == 0x00 59} 60 61func (p protoVersion) response() bool { 62 return p&protoDirectionMask == 0x80 63} 64 65func (p protoVersion) version() byte { 66 return byte(p) & protoVersionMask 67} 68 69func (p protoVersion) String() string { 70 dir := "REQ" 71 if p.response() { 72 dir = "RESP" 73 } 74 75 return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir) 76} 77 78type frameOp byte 79 80const ( 81 // header ops 82 opError frameOp = 0x00 83 opStartup frameOp = 0x01 84 opReady frameOp = 0x02 85 opAuthenticate frameOp = 0x03 86 opOptions frameOp = 0x05 87 opSupported frameOp = 0x06 88 opQuery frameOp = 0x07 89 opResult frameOp = 0x08 90 opPrepare frameOp = 0x09 91 opExecute frameOp = 0x0A 92 opRegister frameOp = 0x0B 93 opEvent frameOp = 0x0C 94 opBatch frameOp = 0x0D 95 opAuthChallenge frameOp = 0x0E 96 opAuthResponse frameOp = 0x0F 97 opAuthSuccess frameOp = 0x10 98) 99 100func (f frameOp) String() string { 101 switch f { 102 case opError: 103 return "ERROR" 104 case opStartup: 105 return "STARTUP" 106 case opReady: 107 return "READY" 108 case opAuthenticate: 109 return "AUTHENTICATE" 110 case opOptions: 111 return "OPTIONS" 112 case opSupported: 113 return "SUPPORTED" 114 case opQuery: 115 return "QUERY" 116 case opResult: 117 return "RESULT" 118 case opPrepare: 119 return "PREPARE" 120 case opExecute: 121 return "EXECUTE" 122 case opRegister: 123 return "REGISTER" 124 case opEvent: 125 return "EVENT" 126 case opBatch: 127 return "BATCH" 128 case opAuthChallenge: 129 return "AUTH_CHALLENGE" 130 case opAuthResponse: 131 return "AUTH_RESPONSE" 132 case opAuthSuccess: 133 return "AUTH_SUCCESS" 134 default: 135 return fmt.Sprintf("UNKNOWN_OP_%d", f) 136 } 137} 138 139const ( 140 // result kind 141 resultKindVoid = 1 142 resultKindRows = 2 143 resultKindKeyspace = 3 144 resultKindPrepared = 4 145 resultKindSchemaChanged = 5 146 147 // rows flags 148 flagGlobalTableSpec int = 0x01 149 flagHasMorePages int = 0x02 150 flagNoMetaData int = 0x04 151 152 // query flags 153 flagValues byte = 0x01 154 flagSkipMetaData byte = 0x02 155 flagPageSize byte = 0x04 156 flagWithPagingState byte = 0x08 157 flagWithSerialConsistency byte = 0x10 158 flagDefaultTimestamp byte = 0x20 159 flagWithNameValues byte = 0x40 160 flagWithKeyspace byte = 0x80 161 162 // prepare flags 163 flagWithPreparedKeyspace uint32 = 0x01 164 165 // header flags 166 flagCompress byte = 0x01 167 flagTracing byte = 0x02 168 flagCustomPayload byte = 0x04 169 flagWarning byte = 0x08 170 flagBetaProtocol byte = 0x10 171) 172 173type Consistency uint16 174 175const ( 176 Any Consistency = 0x00 177 One Consistency = 0x01 178 Two Consistency = 0x02 179 Three Consistency = 0x03 180 Quorum Consistency = 0x04 181 All Consistency = 0x05 182 LocalQuorum Consistency = 0x06 183 EachQuorum Consistency = 0x07 184 LocalOne Consistency = 0x0A 185) 186 187func (c Consistency) String() string { 188 switch c { 189 case Any: 190 return "ANY" 191 case One: 192 return "ONE" 193 case Two: 194 return "TWO" 195 case Three: 196 return "THREE" 197 case Quorum: 198 return "QUORUM" 199 case All: 200 return "ALL" 201 case LocalQuorum: 202 return "LOCAL_QUORUM" 203 case EachQuorum: 204 return "EACH_QUORUM" 205 case LocalOne: 206 return "LOCAL_ONE" 207 default: 208 return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c)) 209 } 210} 211 212func (c Consistency) MarshalText() (text []byte, err error) { 213 return []byte(c.String()), nil 214} 215 216func (c *Consistency) UnmarshalText(text []byte) error { 217 switch string(text) { 218 case "ANY": 219 *c = Any 220 case "ONE": 221 *c = One 222 case "TWO": 223 *c = Two 224 case "THREE": 225 *c = Three 226 case "QUORUM": 227 *c = Quorum 228 case "ALL": 229 *c = All 230 case "LOCAL_QUORUM": 231 *c = LocalQuorum 232 case "EACH_QUORUM": 233 *c = EachQuorum 234 case "LOCAL_ONE": 235 *c = LocalOne 236 default: 237 return fmt.Errorf("invalid consistency %q", string(text)) 238 } 239 240 return nil 241} 242 243func ParseConsistency(s string) Consistency { 244 var c Consistency 245 if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil { 246 panic(err) 247 } 248 return c 249} 250 251// ParseConsistencyWrapper wraps gocql.ParseConsistency to provide an err 252// return instead of a panic 253func ParseConsistencyWrapper(s string) (consistency Consistency, err error) { 254 err = consistency.UnmarshalText([]byte(strings.ToUpper(s))) 255 return 256} 257 258// MustParseConsistency is the same as ParseConsistency except it returns 259// an error (never). It is kept here since breaking changes are not good. 260// DEPRECATED: use ParseConsistency if you want a panic on parse error. 261func MustParseConsistency(s string) (Consistency, error) { 262 c, err := ParseConsistencyWrapper(s) 263 if err != nil { 264 panic(err) 265 } 266 return c, nil 267} 268 269type SerialConsistency uint16 270 271const ( 272 Serial SerialConsistency = 0x08 273 LocalSerial SerialConsistency = 0x09 274) 275 276func (s SerialConsistency) String() string { 277 switch s { 278 case Serial: 279 return "SERIAL" 280 case LocalSerial: 281 return "LOCAL_SERIAL" 282 default: 283 return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s)) 284 } 285} 286 287func (s SerialConsistency) MarshalText() (text []byte, err error) { 288 return []byte(s.String()), nil 289} 290 291func (s *SerialConsistency) UnmarshalText(text []byte) error { 292 switch string(text) { 293 case "SERIAL": 294 *s = Serial 295 case "LOCAL_SERIAL": 296 *s = LocalSerial 297 default: 298 return fmt.Errorf("invalid consistency %q", string(text)) 299 } 300 301 return nil 302} 303 304const ( 305 apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal." 306) 307 308var ( 309 ErrFrameTooBig = errors.New("frame length is bigger than the maximum allowed") 310) 311 312const maxFrameHeaderSize = 9 313 314func writeInt(p []byte, n int32) { 315 p[0] = byte(n >> 24) 316 p[1] = byte(n >> 16) 317 p[2] = byte(n >> 8) 318 p[3] = byte(n) 319} 320 321func readInt(p []byte) int32 { 322 return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3]) 323} 324 325func writeShort(p []byte, n uint16) { 326 p[0] = byte(n >> 8) 327 p[1] = byte(n) 328} 329 330func readShort(p []byte) uint16 { 331 return uint16(p[0])<<8 | uint16(p[1]) 332} 333 334type frameHeader struct { 335 version protoVersion 336 flags byte 337 stream int 338 op frameOp 339 length int 340 warnings []string 341} 342 343func (f frameHeader) String() string { 344 return fmt.Sprintf("[header version=%s flags=0x%x stream=%d op=%s length=%d]", f.version, f.flags, f.stream, f.op, f.length) 345} 346 347func (f frameHeader) Header() frameHeader { 348 return f 349} 350 351const defaultBufSize = 128 352 353type ObservedFrameHeader struct { 354 Version protoVersion 355 Flags byte 356 Stream int16 357 Opcode frameOp 358 Length int32 359 360 // StartHeader is the time we started reading the frame header off the network connection. 361 Start time.Time 362 // EndHeader is the time we finished reading the frame header off the network connection. 363 End time.Time 364} 365 366func (f ObservedFrameHeader) String() string { 367 return fmt.Sprintf("[observed header version=%s flags=0x%x stream=%d op=%s length=%d]", f.Version, f.Flags, f.Stream, f.Opcode, f.Length) 368} 369 370// FrameHeaderObserver is the interface implemented by frame observers / stat collectors. 371// 372// Experimental, this interface and use may change 373type FrameHeaderObserver interface { 374 // ObserveFrameHeader gets called on every received frame header. 375 ObserveFrameHeader(context.Context, ObservedFrameHeader) 376} 377 378// a framer is responsible for reading, writing and parsing frames on a single stream 379type framer struct { 380 r io.Reader 381 w io.Writer 382 383 proto byte 384 // flags are for outgoing flags, enabling compression and tracing etc 385 flags byte 386 compres Compressor 387 headSize int 388 // if this frame was read then the header will be here 389 header *frameHeader 390 391 // if tracing flag is set this is not nil 392 traceID []byte 393 394 // holds a ref to the whole byte slice for rbuf so that it can be reset to 395 // 0 after a read. 396 readBuffer []byte 397 398 rbuf []byte 399 wbuf []byte 400 401 customPayload map[string][]byte 402} 403 404func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *framer { 405 f := &framer{ 406 wbuf: make([]byte, defaultBufSize), 407 readBuffer: make([]byte, defaultBufSize), 408 } 409 var flags byte 410 if compressor != nil { 411 flags |= flagCompress 412 } 413 if version == protoVersion5 { 414 flags |= flagBetaProtocol 415 } 416 417 version &= protoVersionMask 418 419 headSize := 8 420 if version > protoVersion2 { 421 headSize = 9 422 } 423 424 f.compres = compressor 425 f.proto = version 426 f.flags = flags 427 f.headSize = headSize 428 429 f.r = r 430 f.rbuf = f.readBuffer[:0] 431 432 f.w = w 433 f.wbuf = f.wbuf[:0] 434 435 f.header = nil 436 f.traceID = nil 437 438 return f 439} 440 441type frame interface { 442 Header() frameHeader 443} 444 445func readHeader(r io.Reader, p []byte) (head frameHeader, err error) { 446 _, err = io.ReadFull(r, p[:1]) 447 if err != nil { 448 return frameHeader{}, err 449 } 450 451 version := p[0] & protoVersionMask 452 453 if version < protoVersion1 || version > protoVersion5 { 454 return frameHeader{}, fmt.Errorf("gocql: unsupported protocol response version: %d", version) 455 } 456 457 headSize := 9 458 if version < protoVersion3 { 459 headSize = 8 460 } 461 462 _, err = io.ReadFull(r, p[1:headSize]) 463 if err != nil { 464 return frameHeader{}, err 465 } 466 467 p = p[:headSize] 468 469 head.version = protoVersion(p[0]) 470 head.flags = p[1] 471 472 if version > protoVersion2 { 473 if len(p) != 9 { 474 return frameHeader{}, fmt.Errorf("not enough bytes to read header require 9 got: %d", len(p)) 475 } 476 477 head.stream = int(int16(p[2])<<8 | int16(p[3])) 478 head.op = frameOp(p[4]) 479 head.length = int(readInt(p[5:])) 480 } else { 481 if len(p) != 8 { 482 return frameHeader{}, fmt.Errorf("not enough bytes to read header require 8 got: %d", len(p)) 483 } 484 485 head.stream = int(int8(p[2])) 486 head.op = frameOp(p[3]) 487 head.length = int(readInt(p[4:])) 488 } 489 490 return head, nil 491} 492 493// explicitly enables tracing for the framers outgoing requests 494func (f *framer) trace() { 495 f.flags |= flagTracing 496} 497 498// explicitly enables the custom payload flag 499func (f *framer) payload() { 500 f.flags |= flagCustomPayload 501} 502 503// reads a frame form the wire into the framers buffer 504func (f *framer) readFrame(head *frameHeader) error { 505 if head.length < 0 { 506 return fmt.Errorf("frame body length can not be less than 0: %d", head.length) 507 } else if head.length > maxFrameSize { 508 // need to free up the connection to be used again 509 _, err := io.CopyN(ioutil.Discard, f.r, int64(head.length)) 510 if err != nil { 511 return fmt.Errorf("error whilst trying to discard frame with invalid length: %v", err) 512 } 513 return ErrFrameTooBig 514 } 515 516 if cap(f.readBuffer) >= head.length { 517 f.rbuf = f.readBuffer[:head.length] 518 } else { 519 f.readBuffer = make([]byte, head.length) 520 f.rbuf = f.readBuffer 521 } 522 523 // assume the underlying reader takes care of timeouts and retries 524 n, err := io.ReadFull(f.r, f.rbuf) 525 if err != nil { 526 return fmt.Errorf("unable to read frame body: read %d/%d bytes: %v", n, head.length, err) 527 } 528 529 if head.flags&flagCompress == flagCompress { 530 if f.compres == nil { 531 return NewErrProtocol("no compressor available with compressed frame body") 532 } 533 534 f.rbuf, err = f.compres.Decode(f.rbuf) 535 if err != nil { 536 return err 537 } 538 } 539 540 f.header = head 541 return nil 542} 543 544func (f *framer) parseFrame() (frame frame, err error) { 545 defer func() { 546 if r := recover(); r != nil { 547 if _, ok := r.(runtime.Error); ok { 548 panic(r) 549 } 550 err = r.(error) 551 } 552 }() 553 554 if f.header.version.request() { 555 return nil, NewErrProtocol("got a request frame from server: %v", f.header.version) 556 } 557 558 if f.header.flags&flagTracing == flagTracing { 559 f.readTrace() 560 } 561 562 if f.header.flags&flagWarning == flagWarning { 563 f.header.warnings = f.readStringList() 564 } 565 566 if f.header.flags&flagCustomPayload == flagCustomPayload { 567 f.customPayload = f.readBytesMap() 568 } 569 570 // assumes that the frame body has been read into rbuf 571 switch f.header.op { 572 case opError: 573 frame = f.parseErrorFrame() 574 case opReady: 575 frame = f.parseReadyFrame() 576 case opResult: 577 frame, err = f.parseResultFrame() 578 case opSupported: 579 frame = f.parseSupportedFrame() 580 case opAuthenticate: 581 frame = f.parseAuthenticateFrame() 582 case opAuthChallenge: 583 frame = f.parseAuthChallengeFrame() 584 case opAuthSuccess: 585 frame = f.parseAuthSuccessFrame() 586 case opEvent: 587 frame = f.parseEventFrame() 588 default: 589 return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op) 590 } 591 592 return 593} 594 595func (f *framer) parseErrorFrame() frame { 596 code := f.readInt() 597 msg := f.readString() 598 599 errD := errorFrame{ 600 frameHeader: *f.header, 601 code: code, 602 message: msg, 603 } 604 605 switch code { 606 case errUnavailable: 607 cl := f.readConsistency() 608 required := f.readInt() 609 alive := f.readInt() 610 return &RequestErrUnavailable{ 611 errorFrame: errD, 612 Consistency: cl, 613 Required: required, 614 Alive: alive, 615 } 616 case errWriteTimeout: 617 cl := f.readConsistency() 618 received := f.readInt() 619 blockfor := f.readInt() 620 writeType := f.readString() 621 return &RequestErrWriteTimeout{ 622 errorFrame: errD, 623 Consistency: cl, 624 Received: received, 625 BlockFor: blockfor, 626 WriteType: writeType, 627 } 628 case errReadTimeout: 629 cl := f.readConsistency() 630 received := f.readInt() 631 blockfor := f.readInt() 632 dataPresent := f.readByte() 633 return &RequestErrReadTimeout{ 634 errorFrame: errD, 635 Consistency: cl, 636 Received: received, 637 BlockFor: blockfor, 638 DataPresent: dataPresent, 639 } 640 case errAlreadyExists: 641 ks := f.readString() 642 table := f.readString() 643 return &RequestErrAlreadyExists{ 644 errorFrame: errD, 645 Keyspace: ks, 646 Table: table, 647 } 648 case errUnprepared: 649 stmtId := f.readShortBytes() 650 return &RequestErrUnprepared{ 651 errorFrame: errD, 652 StatementId: copyBytes(stmtId), // defensively copy 653 } 654 case errReadFailure: 655 res := &RequestErrReadFailure{ 656 errorFrame: errD, 657 } 658 res.Consistency = f.readConsistency() 659 res.Received = f.readInt() 660 res.BlockFor = f.readInt() 661 if f.proto > protoVersion4 { 662 res.ErrorMap = f.readErrorMap() 663 res.NumFailures = len(res.ErrorMap) 664 } else { 665 res.NumFailures = f.readInt() 666 } 667 res.DataPresent = f.readByte() != 0 668 669 return res 670 case errWriteFailure: 671 res := &RequestErrWriteFailure{ 672 errorFrame: errD, 673 } 674 res.Consistency = f.readConsistency() 675 res.Received = f.readInt() 676 res.BlockFor = f.readInt() 677 if f.proto > protoVersion4 { 678 res.ErrorMap = f.readErrorMap() 679 res.NumFailures = len(res.ErrorMap) 680 } else { 681 res.NumFailures = f.readInt() 682 } 683 res.WriteType = f.readString() 684 return res 685 case errFunctionFailure: 686 res := &RequestErrFunctionFailure{ 687 errorFrame: errD, 688 } 689 res.Keyspace = f.readString() 690 res.Function = f.readString() 691 res.ArgTypes = f.readStringList() 692 return res 693 694 case errCDCWriteFailure: 695 res := &RequestErrCDCWriteFailure{ 696 errorFrame: errD, 697 } 698 return res 699 700 case errInvalid, errBootstrapping, errConfig, errCredentials, errOverloaded, 701 errProtocol, errServer, errSyntax, errTruncate, errUnauthorized: 702 // TODO(zariel): we should have some distinct types for these errors 703 return errD 704 default: 705 panic(fmt.Errorf("unknown error code: 0x%x", errD.code)) 706 } 707} 708 709func (f *framer) readErrorMap() (errMap ErrorMap) { 710 errMap = make(ErrorMap) 711 numErrs := f.readInt() 712 for i := 0; i < numErrs; i++ { 713 ip := f.readInetAdressOnly().String() 714 errMap[ip] = f.readShort() 715 } 716 return 717} 718 719func (f *framer) writeHeader(flags byte, op frameOp, stream int) { 720 f.wbuf = f.wbuf[:0] 721 f.wbuf = append(f.wbuf, 722 f.proto, 723 flags, 724 ) 725 726 if f.proto > protoVersion2 { 727 f.wbuf = append(f.wbuf, 728 byte(stream>>8), 729 byte(stream), 730 ) 731 } else { 732 f.wbuf = append(f.wbuf, 733 byte(stream), 734 ) 735 } 736 737 // pad out length 738 f.wbuf = append(f.wbuf, 739 byte(op), 740 0, 741 0, 742 0, 743 0, 744 ) 745} 746 747func (f *framer) setLength(length int) { 748 p := 4 749 if f.proto > protoVersion2 { 750 p = 5 751 } 752 753 f.wbuf[p+0] = byte(length >> 24) 754 f.wbuf[p+1] = byte(length >> 16) 755 f.wbuf[p+2] = byte(length >> 8) 756 f.wbuf[p+3] = byte(length) 757} 758 759func (f *framer) finishWrite() error { 760 if len(f.wbuf) > maxFrameSize { 761 // huge app frame, lets remove it so it doesn't bloat the heap 762 f.wbuf = make([]byte, defaultBufSize) 763 return ErrFrameTooBig 764 } 765 766 if f.wbuf[1]&flagCompress == flagCompress { 767 if f.compres == nil { 768 panic("compress flag set with no compressor") 769 } 770 771 // TODO: only compress frames which are big enough 772 compressed, err := f.compres.Encode(f.wbuf[f.headSize:]) 773 if err != nil { 774 return err 775 } 776 777 f.wbuf = append(f.wbuf[:f.headSize], compressed...) 778 } 779 length := len(f.wbuf) - f.headSize 780 f.setLength(length) 781 782 _, err := f.w.Write(f.wbuf) 783 if err != nil { 784 return err 785 } 786 787 return nil 788} 789 790func (f *framer) readTrace() { 791 f.traceID = f.readUUID().Bytes() 792} 793 794type readyFrame struct { 795 frameHeader 796} 797 798func (f *framer) parseReadyFrame() frame { 799 return &readyFrame{ 800 frameHeader: *f.header, 801 } 802} 803 804type supportedFrame struct { 805 frameHeader 806 807 supported map[string][]string 808} 809 810// TODO: if we move the body buffer onto the frameHeader then we only need a single 811// framer, and can move the methods onto the header. 812func (f *framer) parseSupportedFrame() frame { 813 return &supportedFrame{ 814 frameHeader: *f.header, 815 816 supported: f.readStringMultiMap(), 817 } 818} 819 820type writeStartupFrame struct { 821 opts map[string]string 822} 823 824func (w writeStartupFrame) String() string { 825 return fmt.Sprintf("[startup opts=%+v]", w.opts) 826} 827 828func (w *writeStartupFrame) writeFrame(f *framer, streamID int) error { 829 f.writeHeader(f.flags&^flagCompress, opStartup, streamID) 830 f.writeStringMap(w.opts) 831 832 return f.finishWrite() 833} 834 835type writePrepareFrame struct { 836 statement string 837 keyspace string 838 customPayload map[string][]byte 839} 840 841func (w *writePrepareFrame) writeFrame(f *framer, streamID int) error { 842 if len(w.customPayload) > 0 { 843 f.payload() 844 } 845 f.writeHeader(f.flags, opPrepare, streamID) 846 f.writeCustomPayload(&w.customPayload) 847 f.writeLongString(w.statement) 848 849 var flags uint32 = 0 850 if w.keyspace != "" { 851 if f.proto > protoVersion4 { 852 flags |= flagWithPreparedKeyspace 853 } else { 854 panic(fmt.Errorf("The keyspace can only be set with protocol 5 or higher")) 855 } 856 } 857 if f.proto > protoVersion4 { 858 f.writeUint(flags) 859 } 860 if w.keyspace != "" { 861 f.writeString(w.keyspace) 862 } 863 864 return f.finishWrite() 865} 866 867func (f *framer) readTypeInfo() TypeInfo { 868 // TODO: factor this out so the same code paths can be used to parse custom 869 // types and other types, as much of the logic will be duplicated. 870 id := f.readShort() 871 872 simple := NativeType{ 873 proto: f.proto, 874 typ: Type(id), 875 } 876 877 if simple.typ == TypeCustom { 878 simple.custom = f.readString() 879 if cassType := getApacheCassandraType(simple.custom); cassType != TypeCustom { 880 simple.typ = cassType 881 } 882 } 883 884 switch simple.typ { 885 case TypeTuple: 886 n := f.readShort() 887 tuple := TupleTypeInfo{ 888 NativeType: simple, 889 Elems: make([]TypeInfo, n), 890 } 891 892 for i := 0; i < int(n); i++ { 893 tuple.Elems[i] = f.readTypeInfo() 894 } 895 896 return tuple 897 898 case TypeUDT: 899 udt := UDTTypeInfo{ 900 NativeType: simple, 901 } 902 udt.KeySpace = f.readString() 903 udt.Name = f.readString() 904 905 n := f.readShort() 906 udt.Elements = make([]UDTField, n) 907 for i := 0; i < int(n); i++ { 908 field := &udt.Elements[i] 909 field.Name = f.readString() 910 field.Type = f.readTypeInfo() 911 } 912 913 return udt 914 case TypeMap, TypeList, TypeSet: 915 collection := CollectionType{ 916 NativeType: simple, 917 } 918 919 if simple.typ == TypeMap { 920 collection.Key = f.readTypeInfo() 921 } 922 923 collection.Elem = f.readTypeInfo() 924 925 return collection 926 } 927 928 return simple 929} 930 931type preparedMetadata struct { 932 resultMetadata 933 934 // proto v4+ 935 pkeyColumns []int 936} 937 938func (r preparedMetadata) String() string { 939 return fmt.Sprintf("[prepared flags=0x%x pkey=%v paging_state=% X columns=%v col_count=%d actual_col_count=%d]", r.flags, r.pkeyColumns, r.pagingState, r.columns, r.colCount, r.actualColCount) 940} 941 942func (f *framer) parsePreparedMetadata() preparedMetadata { 943 // TODO: deduplicate this from parseMetadata 944 meta := preparedMetadata{} 945 946 meta.flags = f.readInt() 947 meta.colCount = f.readInt() 948 if meta.colCount < 0 { 949 panic(fmt.Errorf("received negative column count: %d", meta.colCount)) 950 } 951 meta.actualColCount = meta.colCount 952 953 if f.proto >= protoVersion4 { 954 pkeyCount := f.readInt() 955 pkeys := make([]int, pkeyCount) 956 for i := 0; i < pkeyCount; i++ { 957 pkeys[i] = int(f.readShort()) 958 } 959 meta.pkeyColumns = pkeys 960 } 961 962 if meta.flags&flagHasMorePages == flagHasMorePages { 963 meta.pagingState = copyBytes(f.readBytes()) 964 } 965 966 if meta.flags&flagNoMetaData == flagNoMetaData { 967 return meta 968 } 969 970 var keyspace, table string 971 globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec 972 if globalSpec { 973 keyspace = f.readString() 974 table = f.readString() 975 } 976 977 var cols []ColumnInfo 978 if meta.colCount < 1000 { 979 // preallocate columninfo to avoid excess copying 980 cols = make([]ColumnInfo, meta.colCount) 981 for i := 0; i < meta.colCount; i++ { 982 f.readCol(&cols[i], &meta.resultMetadata, globalSpec, keyspace, table) 983 } 984 } else { 985 // use append, huge number of columns usually indicates a corrupt frame or 986 // just a huge row. 987 for i := 0; i < meta.colCount; i++ { 988 var col ColumnInfo 989 f.readCol(&col, &meta.resultMetadata, globalSpec, keyspace, table) 990 cols = append(cols, col) 991 } 992 } 993 994 meta.columns = cols 995 996 return meta 997} 998 999type resultMetadata struct { 1000 flags int 1001 1002 // only if flagPageState 1003 pagingState []byte 1004 1005 columns []ColumnInfo 1006 colCount int 1007 1008 // this is a count of the total number of columns which can be scanned, 1009 // it is at minimum len(columns) but may be larger, for instance when a column 1010 // is a UDT or tuple. 1011 actualColCount int 1012} 1013 1014func (r *resultMetadata) morePages() bool { 1015 return r.flags&flagHasMorePages == flagHasMorePages 1016} 1017 1018func (r resultMetadata) String() string { 1019 return fmt.Sprintf("[metadata flags=0x%x paging_state=% X columns=%v]", r.flags, r.pagingState, r.columns) 1020} 1021 1022func (f *framer) readCol(col *ColumnInfo, meta *resultMetadata, globalSpec bool, keyspace, table string) { 1023 if !globalSpec { 1024 col.Keyspace = f.readString() 1025 col.Table = f.readString() 1026 } else { 1027 col.Keyspace = keyspace 1028 col.Table = table 1029 } 1030 1031 col.Name = f.readString() 1032 col.TypeInfo = f.readTypeInfo() 1033 switch v := col.TypeInfo.(type) { 1034 // maybe also UDT 1035 case TupleTypeInfo: 1036 // -1 because we already included the tuple column 1037 meta.actualColCount += len(v.Elems) - 1 1038 } 1039} 1040 1041func (f *framer) parseResultMetadata() resultMetadata { 1042 var meta resultMetadata 1043 1044 meta.flags = f.readInt() 1045 meta.colCount = f.readInt() 1046 if meta.colCount < 0 { 1047 panic(fmt.Errorf("received negative column count: %d", meta.colCount)) 1048 } 1049 meta.actualColCount = meta.colCount 1050 1051 if meta.flags&flagHasMorePages == flagHasMorePages { 1052 meta.pagingState = copyBytes(f.readBytes()) 1053 } 1054 1055 if meta.flags&flagNoMetaData == flagNoMetaData { 1056 return meta 1057 } 1058 1059 var keyspace, table string 1060 globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec 1061 if globalSpec { 1062 keyspace = f.readString() 1063 table = f.readString() 1064 } 1065 1066 var cols []ColumnInfo 1067 if meta.colCount < 1000 { 1068 // preallocate columninfo to avoid excess copying 1069 cols = make([]ColumnInfo, meta.colCount) 1070 for i := 0; i < meta.colCount; i++ { 1071 f.readCol(&cols[i], &meta, globalSpec, keyspace, table) 1072 } 1073 1074 } else { 1075 // use append, huge number of columns usually indicates a corrupt frame or 1076 // just a huge row. 1077 for i := 0; i < meta.colCount; i++ { 1078 var col ColumnInfo 1079 f.readCol(&col, &meta, globalSpec, keyspace, table) 1080 cols = append(cols, col) 1081 } 1082 } 1083 1084 meta.columns = cols 1085 1086 return meta 1087} 1088 1089type resultVoidFrame struct { 1090 frameHeader 1091} 1092 1093func (f *resultVoidFrame) String() string { 1094 return "[result_void]" 1095} 1096 1097func (f *framer) parseResultFrame() (frame, error) { 1098 kind := f.readInt() 1099 1100 switch kind { 1101 case resultKindVoid: 1102 return &resultVoidFrame{frameHeader: *f.header}, nil 1103 case resultKindRows: 1104 return f.parseResultRows(), nil 1105 case resultKindKeyspace: 1106 return f.parseResultSetKeyspace(), nil 1107 case resultKindPrepared: 1108 return f.parseResultPrepared(), nil 1109 case resultKindSchemaChanged: 1110 return f.parseResultSchemaChange(), nil 1111 } 1112 1113 return nil, NewErrProtocol("unknown result kind: %x", kind) 1114} 1115 1116type resultRowsFrame struct { 1117 frameHeader 1118 1119 meta resultMetadata 1120 // dont parse the rows here as we only need to do it once 1121 numRows int 1122} 1123 1124func (f *resultRowsFrame) String() string { 1125 return fmt.Sprintf("[result_rows meta=%v]", f.meta) 1126} 1127 1128func (f *framer) parseResultRows() frame { 1129 result := &resultRowsFrame{} 1130 result.meta = f.parseResultMetadata() 1131 1132 result.numRows = f.readInt() 1133 if result.numRows < 0 { 1134 panic(fmt.Errorf("invalid row_count in result frame: %d", result.numRows)) 1135 } 1136 1137 return result 1138} 1139 1140type resultKeyspaceFrame struct { 1141 frameHeader 1142 keyspace string 1143} 1144 1145func (r *resultKeyspaceFrame) String() string { 1146 return fmt.Sprintf("[result_keyspace keyspace=%s]", r.keyspace) 1147} 1148 1149func (f *framer) parseResultSetKeyspace() frame { 1150 return &resultKeyspaceFrame{ 1151 frameHeader: *f.header, 1152 keyspace: f.readString(), 1153 } 1154} 1155 1156type resultPreparedFrame struct { 1157 frameHeader 1158 1159 preparedID []byte 1160 reqMeta preparedMetadata 1161 respMeta resultMetadata 1162} 1163 1164func (f *framer) parseResultPrepared() frame { 1165 frame := &resultPreparedFrame{ 1166 frameHeader: *f.header, 1167 preparedID: f.readShortBytes(), 1168 reqMeta: f.parsePreparedMetadata(), 1169 } 1170 1171 if f.proto < protoVersion2 { 1172 return frame 1173 } 1174 1175 frame.respMeta = f.parseResultMetadata() 1176 1177 return frame 1178} 1179 1180type schemaChangeKeyspace struct { 1181 frameHeader 1182 1183 change string 1184 keyspace string 1185} 1186 1187func (f schemaChangeKeyspace) String() string { 1188 return fmt.Sprintf("[event schema_change_keyspace change=%q keyspace=%q]", f.change, f.keyspace) 1189} 1190 1191type schemaChangeTable struct { 1192 frameHeader 1193 1194 change string 1195 keyspace string 1196 object string 1197} 1198 1199func (f schemaChangeTable) String() string { 1200 return fmt.Sprintf("[event schema_change change=%q keyspace=%q object=%q]", f.change, f.keyspace, f.object) 1201} 1202 1203type schemaChangeType struct { 1204 frameHeader 1205 1206 change string 1207 keyspace string 1208 object string 1209} 1210 1211type schemaChangeFunction struct { 1212 frameHeader 1213 1214 change string 1215 keyspace string 1216 name string 1217 args []string 1218} 1219 1220type schemaChangeAggregate struct { 1221 frameHeader 1222 1223 change string 1224 keyspace string 1225 name string 1226 args []string 1227} 1228 1229func (f *framer) parseResultSchemaChange() frame { 1230 if f.proto <= protoVersion2 { 1231 change := f.readString() 1232 keyspace := f.readString() 1233 table := f.readString() 1234 1235 if table != "" { 1236 return &schemaChangeTable{ 1237 frameHeader: *f.header, 1238 change: change, 1239 keyspace: keyspace, 1240 object: table, 1241 } 1242 } else { 1243 return &schemaChangeKeyspace{ 1244 frameHeader: *f.header, 1245 change: change, 1246 keyspace: keyspace, 1247 } 1248 } 1249 } else { 1250 change := f.readString() 1251 target := f.readString() 1252 1253 // TODO: could just use a separate type for each target 1254 switch target { 1255 case "KEYSPACE": 1256 frame := &schemaChangeKeyspace{ 1257 frameHeader: *f.header, 1258 change: change, 1259 } 1260 1261 frame.keyspace = f.readString() 1262 1263 return frame 1264 case "TABLE": 1265 frame := &schemaChangeTable{ 1266 frameHeader: *f.header, 1267 change: change, 1268 } 1269 1270 frame.keyspace = f.readString() 1271 frame.object = f.readString() 1272 1273 return frame 1274 case "TYPE": 1275 frame := &schemaChangeType{ 1276 frameHeader: *f.header, 1277 change: change, 1278 } 1279 1280 frame.keyspace = f.readString() 1281 frame.object = f.readString() 1282 1283 return frame 1284 case "FUNCTION": 1285 frame := &schemaChangeFunction{ 1286 frameHeader: *f.header, 1287 change: change, 1288 } 1289 1290 frame.keyspace = f.readString() 1291 frame.name = f.readString() 1292 frame.args = f.readStringList() 1293 1294 return frame 1295 case "AGGREGATE": 1296 frame := &schemaChangeAggregate{ 1297 frameHeader: *f.header, 1298 change: change, 1299 } 1300 1301 frame.keyspace = f.readString() 1302 frame.name = f.readString() 1303 frame.args = f.readStringList() 1304 1305 return frame 1306 default: 1307 panic(fmt.Errorf("gocql: unknown SCHEMA_CHANGE target: %q change: %q", target, change)) 1308 } 1309 } 1310 1311} 1312 1313type authenticateFrame struct { 1314 frameHeader 1315 1316 class string 1317} 1318 1319func (a *authenticateFrame) String() string { 1320 return fmt.Sprintf("[authenticate class=%q]", a.class) 1321} 1322 1323func (f *framer) parseAuthenticateFrame() frame { 1324 return &authenticateFrame{ 1325 frameHeader: *f.header, 1326 class: f.readString(), 1327 } 1328} 1329 1330type authSuccessFrame struct { 1331 frameHeader 1332 1333 data []byte 1334} 1335 1336func (a *authSuccessFrame) String() string { 1337 return fmt.Sprintf("[auth_success data=%q]", a.data) 1338} 1339 1340func (f *framer) parseAuthSuccessFrame() frame { 1341 return &authSuccessFrame{ 1342 frameHeader: *f.header, 1343 data: f.readBytes(), 1344 } 1345} 1346 1347type authChallengeFrame struct { 1348 frameHeader 1349 1350 data []byte 1351} 1352 1353func (a *authChallengeFrame) String() string { 1354 return fmt.Sprintf("[auth_challenge data=%q]", a.data) 1355} 1356 1357func (f *framer) parseAuthChallengeFrame() frame { 1358 return &authChallengeFrame{ 1359 frameHeader: *f.header, 1360 data: f.readBytes(), 1361 } 1362} 1363 1364type statusChangeEventFrame struct { 1365 frameHeader 1366 1367 change string 1368 host net.IP 1369 port int 1370} 1371 1372func (t statusChangeEventFrame) String() string { 1373 return fmt.Sprintf("[status_change change=%s host=%v port=%v]", t.change, t.host, t.port) 1374} 1375 1376// essentially the same as statusChange 1377type topologyChangeEventFrame struct { 1378 frameHeader 1379 1380 change string 1381 host net.IP 1382 port int 1383} 1384 1385func (t topologyChangeEventFrame) String() string { 1386 return fmt.Sprintf("[topology_change change=%s host=%v port=%v]", t.change, t.host, t.port) 1387} 1388 1389func (f *framer) parseEventFrame() frame { 1390 eventType := f.readString() 1391 1392 switch eventType { 1393 case "TOPOLOGY_CHANGE": 1394 frame := &topologyChangeEventFrame{frameHeader: *f.header} 1395 frame.change = f.readString() 1396 frame.host, frame.port = f.readInet() 1397 1398 return frame 1399 case "STATUS_CHANGE": 1400 frame := &statusChangeEventFrame{frameHeader: *f.header} 1401 frame.change = f.readString() 1402 frame.host, frame.port = f.readInet() 1403 1404 return frame 1405 case "SCHEMA_CHANGE": 1406 // this should work for all versions 1407 return f.parseResultSchemaChange() 1408 default: 1409 panic(fmt.Errorf("gocql: unknown event type: %q", eventType)) 1410 } 1411 1412} 1413 1414type writeAuthResponseFrame struct { 1415 data []byte 1416} 1417 1418func (a *writeAuthResponseFrame) String() string { 1419 return fmt.Sprintf("[auth_response data=%q]", a.data) 1420} 1421 1422func (a *writeAuthResponseFrame) writeFrame(framer *framer, streamID int) error { 1423 return framer.writeAuthResponseFrame(streamID, a.data) 1424} 1425 1426func (f *framer) writeAuthResponseFrame(streamID int, data []byte) error { 1427 f.writeHeader(f.flags, opAuthResponse, streamID) 1428 f.writeBytes(data) 1429 return f.finishWrite() 1430} 1431 1432type queryValues struct { 1433 value []byte 1434 1435 // optional name, will set With names for values flag 1436 name string 1437 isUnset bool 1438} 1439 1440type queryParams struct { 1441 consistency Consistency 1442 // v2+ 1443 skipMeta bool 1444 values []queryValues 1445 pageSize int 1446 pagingState []byte 1447 serialConsistency SerialConsistency 1448 // v3+ 1449 defaultTimestamp bool 1450 defaultTimestampValue int64 1451 // v5+ 1452 keyspace string 1453} 1454 1455func (q queryParams) String() string { 1456 return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v default_timestamp=%v values=%v keyspace=%s]", 1457 q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.defaultTimestamp, q.values, q.keyspace) 1458} 1459 1460func (f *framer) writeQueryParams(opts *queryParams) { 1461 f.writeConsistency(opts.consistency) 1462 1463 if f.proto == protoVersion1 { 1464 return 1465 } 1466 1467 var flags byte 1468 if len(opts.values) > 0 { 1469 flags |= flagValues 1470 } 1471 if opts.skipMeta { 1472 flags |= flagSkipMetaData 1473 } 1474 if opts.pageSize > 0 { 1475 flags |= flagPageSize 1476 } 1477 if len(opts.pagingState) > 0 { 1478 flags |= flagWithPagingState 1479 } 1480 if opts.serialConsistency > 0 { 1481 flags |= flagWithSerialConsistency 1482 } 1483 1484 names := false 1485 1486 // protoV3 specific things 1487 if f.proto > protoVersion2 { 1488 if opts.defaultTimestamp { 1489 flags |= flagDefaultTimestamp 1490 } 1491 1492 if len(opts.values) > 0 && opts.values[0].name != "" { 1493 flags |= flagWithNameValues 1494 names = true 1495 } 1496 } 1497 1498 if opts.keyspace != "" { 1499 if f.proto > protoVersion4 { 1500 flags |= flagWithKeyspace 1501 } else { 1502 panic(fmt.Errorf("The keyspace can only be set with protocol 5 or higher")) 1503 } 1504 } 1505 1506 if f.proto > protoVersion4 { 1507 f.writeUint(uint32(flags)) 1508 } else { 1509 f.writeByte(flags) 1510 } 1511 1512 if n := len(opts.values); n > 0 { 1513 f.writeShort(uint16(n)) 1514 1515 for i := 0; i < n; i++ { 1516 if names { 1517 f.writeString(opts.values[i].name) 1518 } 1519 if opts.values[i].isUnset { 1520 f.writeUnset() 1521 } else { 1522 f.writeBytes(opts.values[i].value) 1523 } 1524 } 1525 } 1526 1527 if opts.pageSize > 0 { 1528 f.writeInt(int32(opts.pageSize)) 1529 } 1530 1531 if len(opts.pagingState) > 0 { 1532 f.writeBytes(opts.pagingState) 1533 } 1534 1535 if opts.serialConsistency > 0 { 1536 f.writeConsistency(Consistency(opts.serialConsistency)) 1537 } 1538 1539 if f.proto > protoVersion2 && opts.defaultTimestamp { 1540 // timestamp in microseconds 1541 var ts int64 1542 if opts.defaultTimestampValue != 0 { 1543 ts = opts.defaultTimestampValue 1544 } else { 1545 ts = time.Now().UnixNano() / 1000 1546 } 1547 f.writeLong(ts) 1548 } 1549 1550 if opts.keyspace != "" { 1551 f.writeString(opts.keyspace) 1552 } 1553} 1554 1555type writeQueryFrame struct { 1556 statement string 1557 params queryParams 1558 1559 // v4+ 1560 customPayload map[string][]byte 1561} 1562 1563func (w *writeQueryFrame) String() string { 1564 return fmt.Sprintf("[query statement=%q params=%v]", w.statement, w.params) 1565} 1566 1567func (w *writeQueryFrame) writeFrame(framer *framer, streamID int) error { 1568 return framer.writeQueryFrame(streamID, w.statement, &w.params, w.customPayload) 1569} 1570 1571func (f *framer) writeQueryFrame(streamID int, statement string, params *queryParams, customPayload map[string][]byte) error { 1572 if len(customPayload) > 0 { 1573 f.payload() 1574 } 1575 f.writeHeader(f.flags, opQuery, streamID) 1576 f.writeCustomPayload(&customPayload) 1577 f.writeLongString(statement) 1578 f.writeQueryParams(params) 1579 1580 return f.finishWrite() 1581} 1582 1583type frameWriter interface { 1584 writeFrame(framer *framer, streamID int) error 1585} 1586 1587type frameWriterFunc func(framer *framer, streamID int) error 1588 1589func (f frameWriterFunc) writeFrame(framer *framer, streamID int) error { 1590 return f(framer, streamID) 1591} 1592 1593type writeExecuteFrame struct { 1594 preparedID []byte 1595 params queryParams 1596 1597 // v4+ 1598 customPayload map[string][]byte 1599} 1600 1601func (e *writeExecuteFrame) String() string { 1602 return fmt.Sprintf("[execute id=% X params=%v]", e.preparedID, &e.params) 1603} 1604 1605func (e *writeExecuteFrame) writeFrame(fr *framer, streamID int) error { 1606 return fr.writeExecuteFrame(streamID, e.preparedID, &e.params, &e.customPayload) 1607} 1608 1609func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *queryParams, customPayload *map[string][]byte) error { 1610 if len(*customPayload) > 0 { 1611 f.payload() 1612 } 1613 f.writeHeader(f.flags, opExecute, streamID) 1614 f.writeCustomPayload(customPayload) 1615 f.writeShortBytes(preparedID) 1616 if f.proto > protoVersion1 { 1617 f.writeQueryParams(params) 1618 } else { 1619 n := len(params.values) 1620 f.writeShort(uint16(n)) 1621 for i := 0; i < n; i++ { 1622 if params.values[i].isUnset { 1623 f.writeUnset() 1624 } else { 1625 f.writeBytes(params.values[i].value) 1626 } 1627 } 1628 f.writeConsistency(params.consistency) 1629 } 1630 1631 return f.finishWrite() 1632} 1633 1634// TODO: can we replace BatchStatemt with batchStatement? As they prety much 1635// duplicate each other 1636type batchStatment struct { 1637 preparedID []byte 1638 statement string 1639 values []queryValues 1640} 1641 1642type writeBatchFrame struct { 1643 typ BatchType 1644 statements []batchStatment 1645 consistency Consistency 1646 1647 // v3+ 1648 serialConsistency SerialConsistency 1649 defaultTimestamp bool 1650 defaultTimestampValue int64 1651 1652 //v4+ 1653 customPayload map[string][]byte 1654} 1655 1656func (w *writeBatchFrame) writeFrame(framer *framer, streamID int) error { 1657 return framer.writeBatchFrame(streamID, w, w.customPayload) 1658} 1659 1660func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload map[string][]byte) error { 1661 if len(customPayload) > 0 { 1662 f.payload() 1663 } 1664 f.writeHeader(f.flags, opBatch, streamID) 1665 f.writeCustomPayload(&customPayload) 1666 f.writeByte(byte(w.typ)) 1667 1668 n := len(w.statements) 1669 f.writeShort(uint16(n)) 1670 1671 var flags byte 1672 1673 for i := 0; i < n; i++ { 1674 b := &w.statements[i] 1675 if len(b.preparedID) == 0 { 1676 f.writeByte(0) 1677 f.writeLongString(b.statement) 1678 } else { 1679 f.writeByte(1) 1680 f.writeShortBytes(b.preparedID) 1681 } 1682 1683 f.writeShort(uint16(len(b.values))) 1684 for j := range b.values { 1685 col := b.values[j] 1686 if f.proto > protoVersion2 && col.name != "" { 1687 // TODO: move this check into the caller and set a flag on writeBatchFrame 1688 // to indicate using named values 1689 if f.proto <= protoVersion5 { 1690 return fmt.Errorf("gocql: named query values are not supported in batches, please see https://issues.apache.org/jira/browse/CASSANDRA-10246") 1691 } 1692 flags |= flagWithNameValues 1693 f.writeString(col.name) 1694 } 1695 if col.isUnset { 1696 f.writeUnset() 1697 } else { 1698 f.writeBytes(col.value) 1699 } 1700 } 1701 } 1702 1703 f.writeConsistency(w.consistency) 1704 1705 if f.proto > protoVersion2 { 1706 if w.serialConsistency > 0 { 1707 flags |= flagWithSerialConsistency 1708 } 1709 if w.defaultTimestamp { 1710 flags |= flagDefaultTimestamp 1711 } 1712 1713 if f.proto > protoVersion4 { 1714 f.writeUint(uint32(flags)) 1715 } else { 1716 f.writeByte(flags) 1717 } 1718 1719 if w.serialConsistency > 0 { 1720 f.writeConsistency(Consistency(w.serialConsistency)) 1721 } 1722 1723 if w.defaultTimestamp { 1724 var ts int64 1725 if w.defaultTimestampValue != 0 { 1726 ts = w.defaultTimestampValue 1727 } else { 1728 ts = time.Now().UnixNano() / 1000 1729 } 1730 f.writeLong(ts) 1731 } 1732 } 1733 1734 return f.finishWrite() 1735} 1736 1737type writeOptionsFrame struct{} 1738 1739func (w *writeOptionsFrame) writeFrame(framer *framer, streamID int) error { 1740 return framer.writeOptionsFrame(streamID, w) 1741} 1742 1743func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) error { 1744 f.writeHeader(f.flags&^flagCompress, opOptions, stream) 1745 return f.finishWrite() 1746} 1747 1748type writeRegisterFrame struct { 1749 events []string 1750} 1751 1752func (w *writeRegisterFrame) writeFrame(framer *framer, streamID int) error { 1753 return framer.writeRegisterFrame(streamID, w) 1754} 1755 1756func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) error { 1757 f.writeHeader(f.flags, opRegister, streamID) 1758 f.writeStringList(w.events) 1759 1760 return f.finishWrite() 1761} 1762 1763func (f *framer) readByte() byte { 1764 if len(f.rbuf) < 1 { 1765 panic(fmt.Errorf("not enough bytes in buffer to read byte require 1 got: %d", len(f.rbuf))) 1766 } 1767 1768 b := f.rbuf[0] 1769 f.rbuf = f.rbuf[1:] 1770 return b 1771} 1772 1773func (f *framer) readInt() (n int) { 1774 if len(f.rbuf) < 4 { 1775 panic(fmt.Errorf("not enough bytes in buffer to read int require 4 got: %d", len(f.rbuf))) 1776 } 1777 1778 n = int(int32(f.rbuf[0])<<24 | int32(f.rbuf[1])<<16 | int32(f.rbuf[2])<<8 | int32(f.rbuf[3])) 1779 f.rbuf = f.rbuf[4:] 1780 return 1781} 1782 1783func (f *framer) readShort() (n uint16) { 1784 if len(f.rbuf) < 2 { 1785 panic(fmt.Errorf("not enough bytes in buffer to read short require 2 got: %d", len(f.rbuf))) 1786 } 1787 n = uint16(f.rbuf[0])<<8 | uint16(f.rbuf[1]) 1788 f.rbuf = f.rbuf[2:] 1789 return 1790} 1791 1792func (f *framer) readLong() (n int64) { 1793 if len(f.rbuf) < 8 { 1794 panic(fmt.Errorf("not enough bytes in buffer to read long require 8 got: %d", len(f.rbuf))) 1795 } 1796 n = int64(f.rbuf[0])<<56 | int64(f.rbuf[1])<<48 | int64(f.rbuf[2])<<40 | int64(f.rbuf[3])<<32 | 1797 int64(f.rbuf[4])<<24 | int64(f.rbuf[5])<<16 | int64(f.rbuf[6])<<8 | int64(f.rbuf[7]) 1798 f.rbuf = f.rbuf[8:] 1799 return 1800} 1801 1802func (f *framer) readString() (s string) { 1803 size := f.readShort() 1804 1805 if len(f.rbuf) < int(size) { 1806 panic(fmt.Errorf("not enough bytes in buffer to read string require %d got: %d", size, len(f.rbuf))) 1807 } 1808 1809 s = string(f.rbuf[:size]) 1810 f.rbuf = f.rbuf[size:] 1811 return 1812} 1813 1814func (f *framer) readLongString() (s string) { 1815 size := f.readInt() 1816 1817 if len(f.rbuf) < size { 1818 panic(fmt.Errorf("not enough bytes in buffer to read long string require %d got: %d", size, len(f.rbuf))) 1819 } 1820 1821 s = string(f.rbuf[:size]) 1822 f.rbuf = f.rbuf[size:] 1823 return 1824} 1825 1826func (f *framer) readUUID() *UUID { 1827 if len(f.rbuf) < 16 { 1828 panic(fmt.Errorf("not enough bytes in buffer to read uuid require %d got: %d", 16, len(f.rbuf))) 1829 } 1830 1831 // TODO: how to handle this error, if it is a uuid, then sureley, problems? 1832 u, _ := UUIDFromBytes(f.rbuf[:16]) 1833 f.rbuf = f.rbuf[16:] 1834 return &u 1835} 1836 1837func (f *framer) readStringList() []string { 1838 size := f.readShort() 1839 1840 l := make([]string, size) 1841 for i := 0; i < int(size); i++ { 1842 l[i] = f.readString() 1843 } 1844 1845 return l 1846} 1847 1848func (f *framer) readBytesInternal() ([]byte, error) { 1849 size := f.readInt() 1850 if size < 0 { 1851 return nil, nil 1852 } 1853 1854 if len(f.rbuf) < size { 1855 return nil, fmt.Errorf("not enough bytes in buffer to read bytes require %d got: %d", size, len(f.rbuf)) 1856 } 1857 1858 l := f.rbuf[:size] 1859 f.rbuf = f.rbuf[size:] 1860 1861 return l, nil 1862} 1863 1864func (f *framer) readBytes() []byte { 1865 l, err := f.readBytesInternal() 1866 if err != nil { 1867 panic(err) 1868 } 1869 1870 return l 1871} 1872 1873func (f *framer) readShortBytes() []byte { 1874 size := f.readShort() 1875 if len(f.rbuf) < int(size) { 1876 panic(fmt.Errorf("not enough bytes in buffer to read short bytes: require %d got %d", size, len(f.rbuf))) 1877 } 1878 1879 l := f.rbuf[:size] 1880 f.rbuf = f.rbuf[size:] 1881 1882 return l 1883} 1884 1885func (f *framer) readInetAdressOnly() net.IP { 1886 if len(f.rbuf) < 1 { 1887 panic(fmt.Errorf("not enough bytes in buffer to read inet size require %d got: %d", 1, len(f.rbuf))) 1888 } 1889 1890 size := f.rbuf[0] 1891 f.rbuf = f.rbuf[1:] 1892 1893 if !(size == 4 || size == 16) { 1894 panic(fmt.Errorf("invalid IP size: %d", size)) 1895 } 1896 1897 if len(f.rbuf) < 1 { 1898 panic(fmt.Errorf("not enough bytes in buffer to read inet require %d got: %d", size, len(f.rbuf))) 1899 } 1900 1901 ip := make([]byte, size) 1902 copy(ip, f.rbuf[:size]) 1903 f.rbuf = f.rbuf[size:] 1904 return net.IP(ip) 1905} 1906 1907func (f *framer) readInet() (net.IP, int) { 1908 return f.readInetAdressOnly(), f.readInt() 1909} 1910 1911func (f *framer) readConsistency() Consistency { 1912 return Consistency(f.readShort()) 1913} 1914 1915func (f *framer) readStringMap() map[string]string { 1916 size := f.readShort() 1917 m := make(map[string]string, size) 1918 1919 for i := 0; i < int(size); i++ { 1920 k := f.readString() 1921 v := f.readString() 1922 m[k] = v 1923 } 1924 1925 return m 1926} 1927 1928func (f *framer) readBytesMap() map[string][]byte { 1929 size := f.readShort() 1930 m := make(map[string][]byte, size) 1931 1932 for i := 0; i < int(size); i++ { 1933 k := f.readString() 1934 v := f.readBytes() 1935 m[k] = v 1936 } 1937 1938 return m 1939} 1940 1941func (f *framer) readStringMultiMap() map[string][]string { 1942 size := f.readShort() 1943 m := make(map[string][]string, size) 1944 1945 for i := 0; i < int(size); i++ { 1946 k := f.readString() 1947 v := f.readStringList() 1948 m[k] = v 1949 } 1950 1951 return m 1952} 1953 1954func (f *framer) writeByte(b byte) { 1955 f.wbuf = append(f.wbuf, b) 1956} 1957 1958func appendBytes(p []byte, d []byte) []byte { 1959 if d == nil { 1960 return appendInt(p, -1) 1961 } 1962 p = appendInt(p, int32(len(d))) 1963 p = append(p, d...) 1964 return p 1965} 1966 1967func appendShort(p []byte, n uint16) []byte { 1968 return append(p, 1969 byte(n>>8), 1970 byte(n), 1971 ) 1972} 1973 1974func appendInt(p []byte, n int32) []byte { 1975 return append(p, byte(n>>24), 1976 byte(n>>16), 1977 byte(n>>8), 1978 byte(n)) 1979} 1980 1981func appendUint(p []byte, n uint32) []byte { 1982 return append(p, byte(n>>24), 1983 byte(n>>16), 1984 byte(n>>8), 1985 byte(n)) 1986} 1987 1988func appendLong(p []byte, n int64) []byte { 1989 return append(p, 1990 byte(n>>56), 1991 byte(n>>48), 1992 byte(n>>40), 1993 byte(n>>32), 1994 byte(n>>24), 1995 byte(n>>16), 1996 byte(n>>8), 1997 byte(n), 1998 ) 1999} 2000 2001func (f *framer) writeCustomPayload(customPayload *map[string][]byte) { 2002 if len(*customPayload) > 0 { 2003 if f.proto < protoVersion4 { 2004 panic("Custom payload is not supported with version V3 or less") 2005 } 2006 f.writeBytesMap(*customPayload) 2007 } 2008} 2009 2010// these are protocol level binary types 2011func (f *framer) writeInt(n int32) { 2012 f.wbuf = appendInt(f.wbuf, n) 2013} 2014 2015func (f *framer) writeUint(n uint32) { 2016 f.wbuf = appendUint(f.wbuf, n) 2017} 2018 2019func (f *framer) writeShort(n uint16) { 2020 f.wbuf = appendShort(f.wbuf, n) 2021} 2022 2023func (f *framer) writeLong(n int64) { 2024 f.wbuf = appendLong(f.wbuf, n) 2025} 2026 2027func (f *framer) writeString(s string) { 2028 f.writeShort(uint16(len(s))) 2029 f.wbuf = append(f.wbuf, s...) 2030} 2031 2032func (f *framer) writeLongString(s string) { 2033 f.writeInt(int32(len(s))) 2034 f.wbuf = append(f.wbuf, s...) 2035} 2036 2037func (f *framer) writeUUID(u *UUID) { 2038 f.wbuf = append(f.wbuf, u[:]...) 2039} 2040 2041func (f *framer) writeStringList(l []string) { 2042 f.writeShort(uint16(len(l))) 2043 for _, s := range l { 2044 f.writeString(s) 2045 } 2046} 2047 2048func (f *framer) writeUnset() { 2049 // Protocol version 4 specifies that bind variables do not require having a 2050 // value when executing a statement. Bind variables without a value are 2051 // called 'unset'. The 'unset' bind variable is serialized as the int 2052 // value '-2' without following bytes. 2053 f.writeInt(-2) 2054} 2055 2056func (f *framer) writeBytes(p []byte) { 2057 // TODO: handle null case correctly, 2058 // [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0, 2059 // no byte should follow and the value represented is `null`. 2060 if p == nil { 2061 f.writeInt(-1) 2062 } else { 2063 f.writeInt(int32(len(p))) 2064 f.wbuf = append(f.wbuf, p...) 2065 } 2066} 2067 2068func (f *framer) writeShortBytes(p []byte) { 2069 f.writeShort(uint16(len(p))) 2070 f.wbuf = append(f.wbuf, p...) 2071} 2072 2073func (f *framer) writeInet(ip net.IP, port int) { 2074 f.wbuf = append(f.wbuf, 2075 byte(len(ip)), 2076 ) 2077 2078 f.wbuf = append(f.wbuf, 2079 []byte(ip)..., 2080 ) 2081 2082 f.writeInt(int32(port)) 2083} 2084 2085func (f *framer) writeConsistency(cons Consistency) { 2086 f.writeShort(uint16(cons)) 2087} 2088 2089func (f *framer) writeStringMap(m map[string]string) { 2090 f.writeShort(uint16(len(m))) 2091 for k, v := range m { 2092 f.writeString(k) 2093 f.writeString(v) 2094 } 2095} 2096 2097func (f *framer) writeBytesMap(m map[string][]byte) { 2098 f.writeShort(uint16(len(m))) 2099 for k, v := range m { 2100 f.writeString(k) 2101 f.writeBytes(v) 2102 } 2103} 2104