1// Copyright (c) 2012 The gocql Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package gocql 6 7import ( 8 "context" 9 "errors" 10 "fmt" 11 "io" 12 "io/ioutil" 13 "net" 14 "runtime" 15 "strings" 16 "time" 17) 18 19type unsetColumn struct{} 20 21// UnsetValue represents a value used in a query binding that will be ignored by Cassandra. 22// 23// By setting a field to the unset value Cassandra will ignore the write completely. 24// The main advantage is the ability to keep the same prepared statement even when you don't 25// want to update some fields, where before you needed to make another prepared statement. 26// 27// UnsetValue is only available when using the version 4 of the protocol. 28var UnsetValue = unsetColumn{} 29 30type namedValue struct { 31 name string 32 value interface{} 33} 34 35// NamedValue produce a value which will bind to the named parameter in a query 36func NamedValue(name string, value interface{}) interface{} { 37 return &namedValue{ 38 name: name, 39 value: value, 40 } 41} 42 43const ( 44 protoDirectionMask = 0x80 45 protoVersionMask = 0x7F 46 protoVersion1 = 0x01 47 protoVersion2 = 0x02 48 protoVersion3 = 0x03 49 protoVersion4 = 0x04 50 protoVersion5 = 0x05 51 52 maxFrameSize = 256 * 1024 * 1024 53) 54 55type protoVersion byte 56 57func (p protoVersion) request() bool { 58 return p&protoDirectionMask == 0x00 59} 60 61func (p protoVersion) response() bool { 62 return p&protoDirectionMask == 0x80 63} 64 65func (p protoVersion) version() byte { 66 return byte(p) & protoVersionMask 67} 68 69func (p protoVersion) String() string { 70 dir := "REQ" 71 if p.response() { 72 dir = "RESP" 73 } 74 75 return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir) 76} 77 78type frameOp byte 79 80const ( 81 // header ops 82 opError frameOp = 0x00 83 opStartup frameOp = 0x01 84 opReady frameOp = 0x02 85 opAuthenticate frameOp = 0x03 86 opOptions frameOp = 0x05 87 opSupported frameOp = 0x06 88 opQuery frameOp = 0x07 89 opResult frameOp = 0x08 90 opPrepare frameOp = 0x09 91 opExecute frameOp = 0x0A 92 opRegister frameOp = 0x0B 93 opEvent frameOp = 0x0C 94 opBatch frameOp = 0x0D 95 opAuthChallenge frameOp = 0x0E 96 opAuthResponse frameOp = 0x0F 97 opAuthSuccess frameOp = 0x10 98) 99 100func (f frameOp) String() string { 101 switch f { 102 case opError: 103 return "ERROR" 104 case opStartup: 105 return "STARTUP" 106 case opReady: 107 return "READY" 108 case opAuthenticate: 109 return "AUTHENTICATE" 110 case opOptions: 111 return "OPTIONS" 112 case opSupported: 113 return "SUPPORTED" 114 case opQuery: 115 return "QUERY" 116 case opResult: 117 return "RESULT" 118 case opPrepare: 119 return "PREPARE" 120 case opExecute: 121 return "EXECUTE" 122 case opRegister: 123 return "REGISTER" 124 case opEvent: 125 return "EVENT" 126 case opBatch: 127 return "BATCH" 128 case opAuthChallenge: 129 return "AUTH_CHALLENGE" 130 case opAuthResponse: 131 return "AUTH_RESPONSE" 132 case opAuthSuccess: 133 return "AUTH_SUCCESS" 134 default: 135 return fmt.Sprintf("UNKNOWN_OP_%d", f) 136 } 137} 138 139const ( 140 // result kind 141 resultKindVoid = 1 142 resultKindRows = 2 143 resultKindKeyspace = 3 144 resultKindPrepared = 4 145 resultKindSchemaChanged = 5 146 147 // rows flags 148 flagGlobalTableSpec int = 0x01 149 flagHasMorePages int = 0x02 150 flagNoMetaData int = 0x04 151 152 // query flags 153 flagValues byte = 0x01 154 flagSkipMetaData byte = 0x02 155 flagPageSize byte = 0x04 156 flagWithPagingState byte = 0x08 157 flagWithSerialConsistency byte = 0x10 158 flagDefaultTimestamp byte = 0x20 159 flagWithNameValues byte = 0x40 160 flagWithKeyspace byte = 0x80 161 162 // prepare flags 163 flagWithPreparedKeyspace uint32 = 0x01 164 165 // header flags 166 flagCompress byte = 0x01 167 flagTracing byte = 0x02 168 flagCustomPayload byte = 0x04 169 flagWarning byte = 0x08 170 flagBetaProtocol byte = 0x10 171) 172 173type Consistency uint16 174 175const ( 176 Any Consistency = 0x00 177 One Consistency = 0x01 178 Two Consistency = 0x02 179 Three Consistency = 0x03 180 Quorum Consistency = 0x04 181 All Consistency = 0x05 182 LocalQuorum Consistency = 0x06 183 EachQuorum Consistency = 0x07 184 LocalOne Consistency = 0x0A 185) 186 187func (c Consistency) String() string { 188 switch c { 189 case Any: 190 return "ANY" 191 case One: 192 return "ONE" 193 case Two: 194 return "TWO" 195 case Three: 196 return "THREE" 197 case Quorum: 198 return "QUORUM" 199 case All: 200 return "ALL" 201 case LocalQuorum: 202 return "LOCAL_QUORUM" 203 case EachQuorum: 204 return "EACH_QUORUM" 205 case LocalOne: 206 return "LOCAL_ONE" 207 default: 208 return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c)) 209 } 210} 211 212func (c Consistency) MarshalText() (text []byte, err error) { 213 return []byte(c.String()), nil 214} 215 216func (c *Consistency) UnmarshalText(text []byte) error { 217 switch string(text) { 218 case "ANY": 219 *c = Any 220 case "ONE": 221 *c = One 222 case "TWO": 223 *c = Two 224 case "THREE": 225 *c = Three 226 case "QUORUM": 227 *c = Quorum 228 case "ALL": 229 *c = All 230 case "LOCAL_QUORUM": 231 *c = LocalQuorum 232 case "EACH_QUORUM": 233 *c = EachQuorum 234 case "LOCAL_ONE": 235 *c = LocalOne 236 default: 237 return fmt.Errorf("invalid consistency %q", string(text)) 238 } 239 240 return nil 241} 242 243func ParseConsistency(s string) Consistency { 244 var c Consistency 245 if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil { 246 panic(err) 247 } 248 return c 249} 250 251// ParseConsistencyWrapper wraps gocql.ParseConsistency to provide an err 252// return instead of a panic 253func ParseConsistencyWrapper(s string) (consistency Consistency, err error) { 254 err = consistency.UnmarshalText([]byte(strings.ToUpper(s))) 255 return 256} 257 258// MustParseConsistency is the same as ParseConsistency except it returns 259// an error (never). It is kept here since breaking changes are not good. 260// DEPRECATED: use ParseConsistency if you want a panic on parse error. 261func MustParseConsistency(s string) (Consistency, error) { 262 c, err := ParseConsistencyWrapper(s) 263 if err != nil { 264 panic(err) 265 } 266 return c, nil 267} 268 269type SerialConsistency uint16 270 271const ( 272 Serial SerialConsistency = 0x08 273 LocalSerial SerialConsistency = 0x09 274) 275 276func (s SerialConsistency) String() string { 277 switch s { 278 case Serial: 279 return "SERIAL" 280 case LocalSerial: 281 return "LOCAL_SERIAL" 282 default: 283 return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s)) 284 } 285} 286 287func (s SerialConsistency) MarshalText() (text []byte, err error) { 288 return []byte(s.String()), nil 289} 290 291func (s *SerialConsistency) UnmarshalText(text []byte) error { 292 switch string(text) { 293 case "SERIAL": 294 *s = Serial 295 case "LOCAL_SERIAL": 296 *s = LocalSerial 297 default: 298 return fmt.Errorf("invalid consistency %q", string(text)) 299 } 300 301 return nil 302} 303 304const ( 305 apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal." 306) 307 308var ( 309 ErrFrameTooBig = errors.New("frame length is bigger than the maximum allowed") 310) 311 312const maxFrameHeaderSize = 9 313 314func readInt(p []byte) int32 { 315 return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3]) 316} 317 318type frameHeader struct { 319 version protoVersion 320 flags byte 321 stream int 322 op frameOp 323 length int 324 warnings []string 325} 326 327func (f frameHeader) String() string { 328 return fmt.Sprintf("[header version=%s flags=0x%x stream=%d op=%s length=%d]", f.version, f.flags, f.stream, f.op, f.length) 329} 330 331func (f frameHeader) Header() frameHeader { 332 return f 333} 334 335const defaultBufSize = 128 336 337type ObservedFrameHeader struct { 338 Version protoVersion 339 Flags byte 340 Stream int16 341 Opcode frameOp 342 Length int32 343 344 // StartHeader is the time we started reading the frame header off the network connection. 345 Start time.Time 346 // EndHeader is the time we finished reading the frame header off the network connection. 347 End time.Time 348 349 // Host is Host of the connection the frame header was read from. 350 Host *HostInfo 351} 352 353func (f ObservedFrameHeader) String() string { 354 return fmt.Sprintf("[observed header version=%s flags=0x%x stream=%d op=%s length=%d]", f.Version, f.Flags, f.Stream, f.Opcode, f.Length) 355} 356 357// FrameHeaderObserver is the interface implemented by frame observers / stat collectors. 358// 359// Experimental, this interface and use may change 360type FrameHeaderObserver interface { 361 // ObserveFrameHeader gets called on every received frame header. 362 ObserveFrameHeader(context.Context, ObservedFrameHeader) 363} 364 365// a framer is responsible for reading, writing and parsing frames on a single stream 366type framer struct { 367 r io.Reader 368 w io.Writer 369 370 proto byte 371 // flags are for outgoing flags, enabling compression and tracing etc 372 flags byte 373 compres Compressor 374 headSize int 375 // if this frame was read then the header will be here 376 header *frameHeader 377 378 // if tracing flag is set this is not nil 379 traceID []byte 380 381 // holds a ref to the whole byte slice for rbuf so that it can be reset to 382 // 0 after a read. 383 readBuffer []byte 384 385 rbuf []byte 386 wbuf []byte 387 388 customPayload map[string][]byte 389} 390 391func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *framer { 392 f := &framer{ 393 wbuf: make([]byte, defaultBufSize), 394 readBuffer: make([]byte, defaultBufSize), 395 } 396 var flags byte 397 if compressor != nil { 398 flags |= flagCompress 399 } 400 if version == protoVersion5 { 401 flags |= flagBetaProtocol 402 } 403 404 version &= protoVersionMask 405 406 headSize := 8 407 if version > protoVersion2 { 408 headSize = 9 409 } 410 411 f.compres = compressor 412 f.proto = version 413 f.flags = flags 414 f.headSize = headSize 415 416 f.r = r 417 f.rbuf = f.readBuffer[:0] 418 419 f.w = w 420 f.wbuf = f.wbuf[:0] 421 422 f.header = nil 423 f.traceID = nil 424 425 return f 426} 427 428type frame interface { 429 Header() frameHeader 430} 431 432func readHeader(r io.Reader, p []byte) (head frameHeader, err error) { 433 _, err = io.ReadFull(r, p[:1]) 434 if err != nil { 435 return frameHeader{}, err 436 } 437 438 version := p[0] & protoVersionMask 439 440 if version < protoVersion1 || version > protoVersion5 { 441 return frameHeader{}, fmt.Errorf("gocql: unsupported protocol response version: %d", version) 442 } 443 444 headSize := 9 445 if version < protoVersion3 { 446 headSize = 8 447 } 448 449 _, err = io.ReadFull(r, p[1:headSize]) 450 if err != nil { 451 return frameHeader{}, err 452 } 453 454 p = p[:headSize] 455 456 head.version = protoVersion(p[0]) 457 head.flags = p[1] 458 459 if version > protoVersion2 { 460 if len(p) != 9 { 461 return frameHeader{}, fmt.Errorf("not enough bytes to read header require 9 got: %d", len(p)) 462 } 463 464 head.stream = int(int16(p[2])<<8 | int16(p[3])) 465 head.op = frameOp(p[4]) 466 head.length = int(readInt(p[5:])) 467 } else { 468 if len(p) != 8 { 469 return frameHeader{}, fmt.Errorf("not enough bytes to read header require 8 got: %d", len(p)) 470 } 471 472 head.stream = int(int8(p[2])) 473 head.op = frameOp(p[3]) 474 head.length = int(readInt(p[4:])) 475 } 476 477 return head, nil 478} 479 480// explicitly enables tracing for the framers outgoing requests 481func (f *framer) trace() { 482 f.flags |= flagTracing 483} 484 485// explicitly enables the custom payload flag 486func (f *framer) payload() { 487 f.flags |= flagCustomPayload 488} 489 490// reads a frame form the wire into the framers buffer 491func (f *framer) readFrame(head *frameHeader) error { 492 if head.length < 0 { 493 return fmt.Errorf("frame body length can not be less than 0: %d", head.length) 494 } else if head.length > maxFrameSize { 495 // need to free up the connection to be used again 496 _, err := io.CopyN(ioutil.Discard, f.r, int64(head.length)) 497 if err != nil { 498 return fmt.Errorf("error whilst trying to discard frame with invalid length: %v", err) 499 } 500 return ErrFrameTooBig 501 } 502 503 if cap(f.readBuffer) >= head.length { 504 f.rbuf = f.readBuffer[:head.length] 505 } else { 506 f.readBuffer = make([]byte, head.length) 507 f.rbuf = f.readBuffer 508 } 509 510 // assume the underlying reader takes care of timeouts and retries 511 n, err := io.ReadFull(f.r, f.rbuf) 512 if err != nil { 513 return fmt.Errorf("unable to read frame body: read %d/%d bytes: %v", n, head.length, err) 514 } 515 516 if head.flags&flagCompress == flagCompress { 517 if f.compres == nil { 518 return NewErrProtocol("no compressor available with compressed frame body") 519 } 520 521 f.rbuf, err = f.compres.Decode(f.rbuf) 522 if err != nil { 523 return err 524 } 525 } 526 527 f.header = head 528 return nil 529} 530 531func (f *framer) parseFrame() (frame frame, err error) { 532 defer func() { 533 if r := recover(); r != nil { 534 if _, ok := r.(runtime.Error); ok { 535 panic(r) 536 } 537 err = r.(error) 538 } 539 }() 540 541 if f.header.version.request() { 542 return nil, NewErrProtocol("got a request frame from server: %v", f.header.version) 543 } 544 545 if f.header.flags&flagTracing == flagTracing { 546 f.readTrace() 547 } 548 549 if f.header.flags&flagWarning == flagWarning { 550 f.header.warnings = f.readStringList() 551 } 552 553 if f.header.flags&flagCustomPayload == flagCustomPayload { 554 f.customPayload = f.readBytesMap() 555 } 556 557 // assumes that the frame body has been read into rbuf 558 switch f.header.op { 559 case opError: 560 frame = f.parseErrorFrame() 561 case opReady: 562 frame = f.parseReadyFrame() 563 case opResult: 564 frame, err = f.parseResultFrame() 565 case opSupported: 566 frame = f.parseSupportedFrame() 567 case opAuthenticate: 568 frame = f.parseAuthenticateFrame() 569 case opAuthChallenge: 570 frame = f.parseAuthChallengeFrame() 571 case opAuthSuccess: 572 frame = f.parseAuthSuccessFrame() 573 case opEvent: 574 frame = f.parseEventFrame() 575 default: 576 return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op) 577 } 578 579 return 580} 581 582func (f *framer) parseErrorFrame() frame { 583 code := f.readInt() 584 msg := f.readString() 585 586 errD := errorFrame{ 587 frameHeader: *f.header, 588 code: code, 589 message: msg, 590 } 591 592 switch code { 593 case errUnavailable: 594 cl := f.readConsistency() 595 required := f.readInt() 596 alive := f.readInt() 597 return &RequestErrUnavailable{ 598 errorFrame: errD, 599 Consistency: cl, 600 Required: required, 601 Alive: alive, 602 } 603 case errWriteTimeout: 604 cl := f.readConsistency() 605 received := f.readInt() 606 blockfor := f.readInt() 607 writeType := f.readString() 608 return &RequestErrWriteTimeout{ 609 errorFrame: errD, 610 Consistency: cl, 611 Received: received, 612 BlockFor: blockfor, 613 WriteType: writeType, 614 } 615 case errReadTimeout: 616 cl := f.readConsistency() 617 received := f.readInt() 618 blockfor := f.readInt() 619 dataPresent := f.readByte() 620 return &RequestErrReadTimeout{ 621 errorFrame: errD, 622 Consistency: cl, 623 Received: received, 624 BlockFor: blockfor, 625 DataPresent: dataPresent, 626 } 627 case errAlreadyExists: 628 ks := f.readString() 629 table := f.readString() 630 return &RequestErrAlreadyExists{ 631 errorFrame: errD, 632 Keyspace: ks, 633 Table: table, 634 } 635 case errUnprepared: 636 stmtId := f.readShortBytes() 637 return &RequestErrUnprepared{ 638 errorFrame: errD, 639 StatementId: copyBytes(stmtId), // defensively copy 640 } 641 case errReadFailure: 642 res := &RequestErrReadFailure{ 643 errorFrame: errD, 644 } 645 res.Consistency = f.readConsistency() 646 res.Received = f.readInt() 647 res.BlockFor = f.readInt() 648 if f.proto > protoVersion4 { 649 res.ErrorMap = f.readErrorMap() 650 res.NumFailures = len(res.ErrorMap) 651 } else { 652 res.NumFailures = f.readInt() 653 } 654 res.DataPresent = f.readByte() != 0 655 656 return res 657 case errWriteFailure: 658 res := &RequestErrWriteFailure{ 659 errorFrame: errD, 660 } 661 res.Consistency = f.readConsistency() 662 res.Received = f.readInt() 663 res.BlockFor = f.readInt() 664 if f.proto > protoVersion4 { 665 res.ErrorMap = f.readErrorMap() 666 res.NumFailures = len(res.ErrorMap) 667 } else { 668 res.NumFailures = f.readInt() 669 } 670 res.WriteType = f.readString() 671 return res 672 case errFunctionFailure: 673 res := &RequestErrFunctionFailure{ 674 errorFrame: errD, 675 } 676 res.Keyspace = f.readString() 677 res.Function = f.readString() 678 res.ArgTypes = f.readStringList() 679 return res 680 681 case errCDCWriteFailure: 682 res := &RequestErrCDCWriteFailure{ 683 errorFrame: errD, 684 } 685 return res 686 687 case errInvalid, errBootstrapping, errConfig, errCredentials, errOverloaded, 688 errProtocol, errServer, errSyntax, errTruncate, errUnauthorized: 689 // TODO(zariel): we should have some distinct types for these errors 690 return errD 691 default: 692 panic(fmt.Errorf("unknown error code: 0x%x", errD.code)) 693 } 694} 695 696func (f *framer) readErrorMap() (errMap ErrorMap) { 697 errMap = make(ErrorMap) 698 numErrs := f.readInt() 699 for i := 0; i < numErrs; i++ { 700 ip := f.readInetAdressOnly().String() 701 errMap[ip] = f.readShort() 702 } 703 return 704} 705 706func (f *framer) writeHeader(flags byte, op frameOp, stream int) { 707 f.wbuf = f.wbuf[:0] 708 f.wbuf = append(f.wbuf, 709 f.proto, 710 flags, 711 ) 712 713 if f.proto > protoVersion2 { 714 f.wbuf = append(f.wbuf, 715 byte(stream>>8), 716 byte(stream), 717 ) 718 } else { 719 f.wbuf = append(f.wbuf, 720 byte(stream), 721 ) 722 } 723 724 // pad out length 725 f.wbuf = append(f.wbuf, 726 byte(op), 727 0, 728 0, 729 0, 730 0, 731 ) 732} 733 734func (f *framer) setLength(length int) { 735 p := 4 736 if f.proto > protoVersion2 { 737 p = 5 738 } 739 740 f.wbuf[p+0] = byte(length >> 24) 741 f.wbuf[p+1] = byte(length >> 16) 742 f.wbuf[p+2] = byte(length >> 8) 743 f.wbuf[p+3] = byte(length) 744} 745 746func (f *framer) finishWrite() error { 747 if len(f.wbuf) > maxFrameSize { 748 // huge app frame, lets remove it so it doesn't bloat the heap 749 f.wbuf = make([]byte, defaultBufSize) 750 return ErrFrameTooBig 751 } 752 753 if f.wbuf[1]&flagCompress == flagCompress { 754 if f.compres == nil { 755 panic("compress flag set with no compressor") 756 } 757 758 // TODO: only compress frames which are big enough 759 compressed, err := f.compres.Encode(f.wbuf[f.headSize:]) 760 if err != nil { 761 return err 762 } 763 764 f.wbuf = append(f.wbuf[:f.headSize], compressed...) 765 } 766 length := len(f.wbuf) - f.headSize 767 f.setLength(length) 768 769 _, err := f.w.Write(f.wbuf) 770 if err != nil { 771 return err 772 } 773 774 return nil 775} 776 777func (f *framer) readTrace() { 778 f.traceID = f.readUUID().Bytes() 779} 780 781type readyFrame struct { 782 frameHeader 783} 784 785func (f *framer) parseReadyFrame() frame { 786 return &readyFrame{ 787 frameHeader: *f.header, 788 } 789} 790 791type supportedFrame struct { 792 frameHeader 793 794 supported map[string][]string 795} 796 797// TODO: if we move the body buffer onto the frameHeader then we only need a single 798// framer, and can move the methods onto the header. 799func (f *framer) parseSupportedFrame() frame { 800 return &supportedFrame{ 801 frameHeader: *f.header, 802 803 supported: f.readStringMultiMap(), 804 } 805} 806 807type writeStartupFrame struct { 808 opts map[string]string 809} 810 811func (w writeStartupFrame) String() string { 812 return fmt.Sprintf("[startup opts=%+v]", w.opts) 813} 814 815func (w *writeStartupFrame) writeFrame(f *framer, streamID int) error { 816 f.writeHeader(f.flags&^flagCompress, opStartup, streamID) 817 f.writeStringMap(w.opts) 818 819 return f.finishWrite() 820} 821 822type writePrepareFrame struct { 823 statement string 824 keyspace string 825 customPayload map[string][]byte 826} 827 828func (w *writePrepareFrame) writeFrame(f *framer, streamID int) error { 829 if len(w.customPayload) > 0 { 830 f.payload() 831 } 832 f.writeHeader(f.flags, opPrepare, streamID) 833 f.writeCustomPayload(&w.customPayload) 834 f.writeLongString(w.statement) 835 836 var flags uint32 = 0 837 if w.keyspace != "" { 838 if f.proto > protoVersion4 { 839 flags |= flagWithPreparedKeyspace 840 } else { 841 panic(fmt.Errorf("the keyspace can only be set with protocol 5 or higher")) 842 } 843 } 844 if f.proto > protoVersion4 { 845 f.writeUint(flags) 846 } 847 if w.keyspace != "" { 848 f.writeString(w.keyspace) 849 } 850 851 return f.finishWrite() 852} 853 854func (f *framer) readTypeInfo() TypeInfo { 855 // TODO: factor this out so the same code paths can be used to parse custom 856 // types and other types, as much of the logic will be duplicated. 857 id := f.readShort() 858 859 simple := NativeType{ 860 proto: f.proto, 861 typ: Type(id), 862 } 863 864 if simple.typ == TypeCustom { 865 simple.custom = f.readString() 866 if cassType := getApacheCassandraType(simple.custom); cassType != TypeCustom { 867 simple.typ = cassType 868 } 869 } 870 871 switch simple.typ { 872 case TypeTuple: 873 n := f.readShort() 874 tuple := TupleTypeInfo{ 875 NativeType: simple, 876 Elems: make([]TypeInfo, n), 877 } 878 879 for i := 0; i < int(n); i++ { 880 tuple.Elems[i] = f.readTypeInfo() 881 } 882 883 return tuple 884 885 case TypeUDT: 886 udt := UDTTypeInfo{ 887 NativeType: simple, 888 } 889 udt.KeySpace = f.readString() 890 udt.Name = f.readString() 891 892 n := f.readShort() 893 udt.Elements = make([]UDTField, n) 894 for i := 0; i < int(n); i++ { 895 field := &udt.Elements[i] 896 field.Name = f.readString() 897 field.Type = f.readTypeInfo() 898 } 899 900 return udt 901 case TypeMap, TypeList, TypeSet: 902 collection := CollectionType{ 903 NativeType: simple, 904 } 905 906 if simple.typ == TypeMap { 907 collection.Key = f.readTypeInfo() 908 } 909 910 collection.Elem = f.readTypeInfo() 911 912 return collection 913 } 914 915 return simple 916} 917 918type preparedMetadata struct { 919 resultMetadata 920 921 // proto v4+ 922 pkeyColumns []int 923} 924 925func (r preparedMetadata) String() string { 926 return fmt.Sprintf("[prepared flags=0x%x pkey=%v paging_state=% X columns=%v col_count=%d actual_col_count=%d]", r.flags, r.pkeyColumns, r.pagingState, r.columns, r.colCount, r.actualColCount) 927} 928 929func (f *framer) parsePreparedMetadata() preparedMetadata { 930 // TODO: deduplicate this from parseMetadata 931 meta := preparedMetadata{} 932 933 meta.flags = f.readInt() 934 meta.colCount = f.readInt() 935 if meta.colCount < 0 { 936 panic(fmt.Errorf("received negative column count: %d", meta.colCount)) 937 } 938 meta.actualColCount = meta.colCount 939 940 if f.proto >= protoVersion4 { 941 pkeyCount := f.readInt() 942 pkeys := make([]int, pkeyCount) 943 for i := 0; i < pkeyCount; i++ { 944 pkeys[i] = int(f.readShort()) 945 } 946 meta.pkeyColumns = pkeys 947 } 948 949 if meta.flags&flagHasMorePages == flagHasMorePages { 950 meta.pagingState = copyBytes(f.readBytes()) 951 } 952 953 if meta.flags&flagNoMetaData == flagNoMetaData { 954 return meta 955 } 956 957 var keyspace, table string 958 globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec 959 if globalSpec { 960 keyspace = f.readString() 961 table = f.readString() 962 } 963 964 var cols []ColumnInfo 965 if meta.colCount < 1000 { 966 // preallocate columninfo to avoid excess copying 967 cols = make([]ColumnInfo, meta.colCount) 968 for i := 0; i < meta.colCount; i++ { 969 f.readCol(&cols[i], &meta.resultMetadata, globalSpec, keyspace, table) 970 } 971 } else { 972 // use append, huge number of columns usually indicates a corrupt frame or 973 // just a huge row. 974 for i := 0; i < meta.colCount; i++ { 975 var col ColumnInfo 976 f.readCol(&col, &meta.resultMetadata, globalSpec, keyspace, table) 977 cols = append(cols, col) 978 } 979 } 980 981 meta.columns = cols 982 983 return meta 984} 985 986type resultMetadata struct { 987 flags int 988 989 // only if flagPageState 990 pagingState []byte 991 992 columns []ColumnInfo 993 colCount int 994 995 // this is a count of the total number of columns which can be scanned, 996 // it is at minimum len(columns) but may be larger, for instance when a column 997 // is a UDT or tuple. 998 actualColCount int 999} 1000 1001func (r *resultMetadata) morePages() bool { 1002 return r.flags&flagHasMorePages == flagHasMorePages 1003} 1004 1005func (r resultMetadata) String() string { 1006 return fmt.Sprintf("[metadata flags=0x%x paging_state=% X columns=%v]", r.flags, r.pagingState, r.columns) 1007} 1008 1009func (f *framer) readCol(col *ColumnInfo, meta *resultMetadata, globalSpec bool, keyspace, table string) { 1010 if !globalSpec { 1011 col.Keyspace = f.readString() 1012 col.Table = f.readString() 1013 } else { 1014 col.Keyspace = keyspace 1015 col.Table = table 1016 } 1017 1018 col.Name = f.readString() 1019 col.TypeInfo = f.readTypeInfo() 1020 switch v := col.TypeInfo.(type) { 1021 // maybe also UDT 1022 case TupleTypeInfo: 1023 // -1 because we already included the tuple column 1024 meta.actualColCount += len(v.Elems) - 1 1025 } 1026} 1027 1028func (f *framer) parseResultMetadata() resultMetadata { 1029 var meta resultMetadata 1030 1031 meta.flags = f.readInt() 1032 meta.colCount = f.readInt() 1033 if meta.colCount < 0 { 1034 panic(fmt.Errorf("received negative column count: %d", meta.colCount)) 1035 } 1036 meta.actualColCount = meta.colCount 1037 1038 if meta.flags&flagHasMorePages == flagHasMorePages { 1039 meta.pagingState = copyBytes(f.readBytes()) 1040 } 1041 1042 if meta.flags&flagNoMetaData == flagNoMetaData { 1043 return meta 1044 } 1045 1046 var keyspace, table string 1047 globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec 1048 if globalSpec { 1049 keyspace = f.readString() 1050 table = f.readString() 1051 } 1052 1053 var cols []ColumnInfo 1054 if meta.colCount < 1000 { 1055 // preallocate columninfo to avoid excess copying 1056 cols = make([]ColumnInfo, meta.colCount) 1057 for i := 0; i < meta.colCount; i++ { 1058 f.readCol(&cols[i], &meta, globalSpec, keyspace, table) 1059 } 1060 1061 } else { 1062 // use append, huge number of columns usually indicates a corrupt frame or 1063 // just a huge row. 1064 for i := 0; i < meta.colCount; i++ { 1065 var col ColumnInfo 1066 f.readCol(&col, &meta, globalSpec, keyspace, table) 1067 cols = append(cols, col) 1068 } 1069 } 1070 1071 meta.columns = cols 1072 1073 return meta 1074} 1075 1076type resultVoidFrame struct { 1077 frameHeader 1078} 1079 1080func (f *resultVoidFrame) String() string { 1081 return "[result_void]" 1082} 1083 1084func (f *framer) parseResultFrame() (frame, error) { 1085 kind := f.readInt() 1086 1087 switch kind { 1088 case resultKindVoid: 1089 return &resultVoidFrame{frameHeader: *f.header}, nil 1090 case resultKindRows: 1091 return f.parseResultRows(), nil 1092 case resultKindKeyspace: 1093 return f.parseResultSetKeyspace(), nil 1094 case resultKindPrepared: 1095 return f.parseResultPrepared(), nil 1096 case resultKindSchemaChanged: 1097 return f.parseResultSchemaChange(), nil 1098 } 1099 1100 return nil, NewErrProtocol("unknown result kind: %x", kind) 1101} 1102 1103type resultRowsFrame struct { 1104 frameHeader 1105 1106 meta resultMetadata 1107 // dont parse the rows here as we only need to do it once 1108 numRows int 1109} 1110 1111func (f *resultRowsFrame) String() string { 1112 return fmt.Sprintf("[result_rows meta=%v]", f.meta) 1113} 1114 1115func (f *framer) parseResultRows() frame { 1116 result := &resultRowsFrame{} 1117 result.meta = f.parseResultMetadata() 1118 1119 result.numRows = f.readInt() 1120 if result.numRows < 0 { 1121 panic(fmt.Errorf("invalid row_count in result frame: %d", result.numRows)) 1122 } 1123 1124 return result 1125} 1126 1127type resultKeyspaceFrame struct { 1128 frameHeader 1129 keyspace string 1130} 1131 1132func (r *resultKeyspaceFrame) String() string { 1133 return fmt.Sprintf("[result_keyspace keyspace=%s]", r.keyspace) 1134} 1135 1136func (f *framer) parseResultSetKeyspace() frame { 1137 return &resultKeyspaceFrame{ 1138 frameHeader: *f.header, 1139 keyspace: f.readString(), 1140 } 1141} 1142 1143type resultPreparedFrame struct { 1144 frameHeader 1145 1146 preparedID []byte 1147 reqMeta preparedMetadata 1148 respMeta resultMetadata 1149} 1150 1151func (f *framer) parseResultPrepared() frame { 1152 frame := &resultPreparedFrame{ 1153 frameHeader: *f.header, 1154 preparedID: f.readShortBytes(), 1155 reqMeta: f.parsePreparedMetadata(), 1156 } 1157 1158 if f.proto < protoVersion2 { 1159 return frame 1160 } 1161 1162 frame.respMeta = f.parseResultMetadata() 1163 1164 return frame 1165} 1166 1167type schemaChangeKeyspace struct { 1168 frameHeader 1169 1170 change string 1171 keyspace string 1172} 1173 1174func (f schemaChangeKeyspace) String() string { 1175 return fmt.Sprintf("[event schema_change_keyspace change=%q keyspace=%q]", f.change, f.keyspace) 1176} 1177 1178type schemaChangeTable struct { 1179 frameHeader 1180 1181 change string 1182 keyspace string 1183 object string 1184} 1185 1186func (f schemaChangeTable) String() string { 1187 return fmt.Sprintf("[event schema_change change=%q keyspace=%q object=%q]", f.change, f.keyspace, f.object) 1188} 1189 1190type schemaChangeType struct { 1191 frameHeader 1192 1193 change string 1194 keyspace string 1195 object string 1196} 1197 1198type schemaChangeFunction struct { 1199 frameHeader 1200 1201 change string 1202 keyspace string 1203 name string 1204 args []string 1205} 1206 1207type schemaChangeAggregate struct { 1208 frameHeader 1209 1210 change string 1211 keyspace string 1212 name string 1213 args []string 1214} 1215 1216func (f *framer) parseResultSchemaChange() frame { 1217 if f.proto <= protoVersion2 { 1218 change := f.readString() 1219 keyspace := f.readString() 1220 table := f.readString() 1221 1222 if table != "" { 1223 return &schemaChangeTable{ 1224 frameHeader: *f.header, 1225 change: change, 1226 keyspace: keyspace, 1227 object: table, 1228 } 1229 } else { 1230 return &schemaChangeKeyspace{ 1231 frameHeader: *f.header, 1232 change: change, 1233 keyspace: keyspace, 1234 } 1235 } 1236 } else { 1237 change := f.readString() 1238 target := f.readString() 1239 1240 // TODO: could just use a separate type for each target 1241 switch target { 1242 case "KEYSPACE": 1243 frame := &schemaChangeKeyspace{ 1244 frameHeader: *f.header, 1245 change: change, 1246 } 1247 1248 frame.keyspace = f.readString() 1249 1250 return frame 1251 case "TABLE": 1252 frame := &schemaChangeTable{ 1253 frameHeader: *f.header, 1254 change: change, 1255 } 1256 1257 frame.keyspace = f.readString() 1258 frame.object = f.readString() 1259 1260 return frame 1261 case "TYPE": 1262 frame := &schemaChangeType{ 1263 frameHeader: *f.header, 1264 change: change, 1265 } 1266 1267 frame.keyspace = f.readString() 1268 frame.object = f.readString() 1269 1270 return frame 1271 case "FUNCTION": 1272 frame := &schemaChangeFunction{ 1273 frameHeader: *f.header, 1274 change: change, 1275 } 1276 1277 frame.keyspace = f.readString() 1278 frame.name = f.readString() 1279 frame.args = f.readStringList() 1280 1281 return frame 1282 case "AGGREGATE": 1283 frame := &schemaChangeAggregate{ 1284 frameHeader: *f.header, 1285 change: change, 1286 } 1287 1288 frame.keyspace = f.readString() 1289 frame.name = f.readString() 1290 frame.args = f.readStringList() 1291 1292 return frame 1293 default: 1294 panic(fmt.Errorf("gocql: unknown SCHEMA_CHANGE target: %q change: %q", target, change)) 1295 } 1296 } 1297 1298} 1299 1300type authenticateFrame struct { 1301 frameHeader 1302 1303 class string 1304} 1305 1306func (a *authenticateFrame) String() string { 1307 return fmt.Sprintf("[authenticate class=%q]", a.class) 1308} 1309 1310func (f *framer) parseAuthenticateFrame() frame { 1311 return &authenticateFrame{ 1312 frameHeader: *f.header, 1313 class: f.readString(), 1314 } 1315} 1316 1317type authSuccessFrame struct { 1318 frameHeader 1319 1320 data []byte 1321} 1322 1323func (a *authSuccessFrame) String() string { 1324 return fmt.Sprintf("[auth_success data=%q]", a.data) 1325} 1326 1327func (f *framer) parseAuthSuccessFrame() frame { 1328 return &authSuccessFrame{ 1329 frameHeader: *f.header, 1330 data: f.readBytes(), 1331 } 1332} 1333 1334type authChallengeFrame struct { 1335 frameHeader 1336 1337 data []byte 1338} 1339 1340func (a *authChallengeFrame) String() string { 1341 return fmt.Sprintf("[auth_challenge data=%q]", a.data) 1342} 1343 1344func (f *framer) parseAuthChallengeFrame() frame { 1345 return &authChallengeFrame{ 1346 frameHeader: *f.header, 1347 data: f.readBytes(), 1348 } 1349} 1350 1351type statusChangeEventFrame struct { 1352 frameHeader 1353 1354 change string 1355 host net.IP 1356 port int 1357} 1358 1359func (t statusChangeEventFrame) String() string { 1360 return fmt.Sprintf("[status_change change=%s host=%v port=%v]", t.change, t.host, t.port) 1361} 1362 1363// essentially the same as statusChange 1364type topologyChangeEventFrame struct { 1365 frameHeader 1366 1367 change string 1368 host net.IP 1369 port int 1370} 1371 1372func (t topologyChangeEventFrame) String() string { 1373 return fmt.Sprintf("[topology_change change=%s host=%v port=%v]", t.change, t.host, t.port) 1374} 1375 1376func (f *framer) parseEventFrame() frame { 1377 eventType := f.readString() 1378 1379 switch eventType { 1380 case "TOPOLOGY_CHANGE": 1381 frame := &topologyChangeEventFrame{frameHeader: *f.header} 1382 frame.change = f.readString() 1383 frame.host, frame.port = f.readInet() 1384 1385 return frame 1386 case "STATUS_CHANGE": 1387 frame := &statusChangeEventFrame{frameHeader: *f.header} 1388 frame.change = f.readString() 1389 frame.host, frame.port = f.readInet() 1390 1391 return frame 1392 case "SCHEMA_CHANGE": 1393 // this should work for all versions 1394 return f.parseResultSchemaChange() 1395 default: 1396 panic(fmt.Errorf("gocql: unknown event type: %q", eventType)) 1397 } 1398 1399} 1400 1401type writeAuthResponseFrame struct { 1402 data []byte 1403} 1404 1405func (a *writeAuthResponseFrame) String() string { 1406 return fmt.Sprintf("[auth_response data=%q]", a.data) 1407} 1408 1409func (a *writeAuthResponseFrame) writeFrame(framer *framer, streamID int) error { 1410 return framer.writeAuthResponseFrame(streamID, a.data) 1411} 1412 1413func (f *framer) writeAuthResponseFrame(streamID int, data []byte) error { 1414 f.writeHeader(f.flags, opAuthResponse, streamID) 1415 f.writeBytes(data) 1416 return f.finishWrite() 1417} 1418 1419type queryValues struct { 1420 value []byte 1421 1422 // optional name, will set With names for values flag 1423 name string 1424 isUnset bool 1425} 1426 1427type queryParams struct { 1428 consistency Consistency 1429 // v2+ 1430 skipMeta bool 1431 values []queryValues 1432 pageSize int 1433 pagingState []byte 1434 serialConsistency SerialConsistency 1435 // v3+ 1436 defaultTimestamp bool 1437 defaultTimestampValue int64 1438 // v5+ 1439 keyspace string 1440} 1441 1442func (q queryParams) String() string { 1443 return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v default_timestamp=%v values=%v keyspace=%s]", 1444 q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.defaultTimestamp, q.values, q.keyspace) 1445} 1446 1447func (f *framer) writeQueryParams(opts *queryParams) { 1448 f.writeConsistency(opts.consistency) 1449 1450 if f.proto == protoVersion1 { 1451 return 1452 } 1453 1454 var flags byte 1455 if len(opts.values) > 0 { 1456 flags |= flagValues 1457 } 1458 if opts.skipMeta { 1459 flags |= flagSkipMetaData 1460 } 1461 if opts.pageSize > 0 { 1462 flags |= flagPageSize 1463 } 1464 if len(opts.pagingState) > 0 { 1465 flags |= flagWithPagingState 1466 } 1467 if opts.serialConsistency > 0 { 1468 flags |= flagWithSerialConsistency 1469 } 1470 1471 names := false 1472 1473 // protoV3 specific things 1474 if f.proto > protoVersion2 { 1475 if opts.defaultTimestamp { 1476 flags |= flagDefaultTimestamp 1477 } 1478 1479 if len(opts.values) > 0 && opts.values[0].name != "" { 1480 flags |= flagWithNameValues 1481 names = true 1482 } 1483 } 1484 1485 if opts.keyspace != "" { 1486 if f.proto > protoVersion4 { 1487 flags |= flagWithKeyspace 1488 } else { 1489 panic(fmt.Errorf("the keyspace can only be set with protocol 5 or higher")) 1490 } 1491 } 1492 1493 if f.proto > protoVersion4 { 1494 f.writeUint(uint32(flags)) 1495 } else { 1496 f.writeByte(flags) 1497 } 1498 1499 if n := len(opts.values); n > 0 { 1500 f.writeShort(uint16(n)) 1501 1502 for i := 0; i < n; i++ { 1503 if names { 1504 f.writeString(opts.values[i].name) 1505 } 1506 if opts.values[i].isUnset { 1507 f.writeUnset() 1508 } else { 1509 f.writeBytes(opts.values[i].value) 1510 } 1511 } 1512 } 1513 1514 if opts.pageSize > 0 { 1515 f.writeInt(int32(opts.pageSize)) 1516 } 1517 1518 if len(opts.pagingState) > 0 { 1519 f.writeBytes(opts.pagingState) 1520 } 1521 1522 if opts.serialConsistency > 0 { 1523 f.writeConsistency(Consistency(opts.serialConsistency)) 1524 } 1525 1526 if f.proto > protoVersion2 && opts.defaultTimestamp { 1527 // timestamp in microseconds 1528 var ts int64 1529 if opts.defaultTimestampValue != 0 { 1530 ts = opts.defaultTimestampValue 1531 } else { 1532 ts = time.Now().UnixNano() / 1000 1533 } 1534 f.writeLong(ts) 1535 } 1536 1537 if opts.keyspace != "" { 1538 f.writeString(opts.keyspace) 1539 } 1540} 1541 1542type writeQueryFrame struct { 1543 statement string 1544 params queryParams 1545 1546 // v4+ 1547 customPayload map[string][]byte 1548} 1549 1550func (w *writeQueryFrame) String() string { 1551 return fmt.Sprintf("[query statement=%q params=%v]", w.statement, w.params) 1552} 1553 1554func (w *writeQueryFrame) writeFrame(framer *framer, streamID int) error { 1555 return framer.writeQueryFrame(streamID, w.statement, &w.params, w.customPayload) 1556} 1557 1558func (f *framer) writeQueryFrame(streamID int, statement string, params *queryParams, customPayload map[string][]byte) error { 1559 if len(customPayload) > 0 { 1560 f.payload() 1561 } 1562 f.writeHeader(f.flags, opQuery, streamID) 1563 f.writeCustomPayload(&customPayload) 1564 f.writeLongString(statement) 1565 f.writeQueryParams(params) 1566 1567 return f.finishWrite() 1568} 1569 1570type frameWriter interface { 1571 writeFrame(framer *framer, streamID int) error 1572} 1573 1574type frameWriterFunc func(framer *framer, streamID int) error 1575 1576func (f frameWriterFunc) writeFrame(framer *framer, streamID int) error { 1577 return f(framer, streamID) 1578} 1579 1580type writeExecuteFrame struct { 1581 preparedID []byte 1582 params queryParams 1583 1584 // v4+ 1585 customPayload map[string][]byte 1586} 1587 1588func (e *writeExecuteFrame) String() string { 1589 return fmt.Sprintf("[execute id=% X params=%v]", e.preparedID, &e.params) 1590} 1591 1592func (e *writeExecuteFrame) writeFrame(fr *framer, streamID int) error { 1593 return fr.writeExecuteFrame(streamID, e.preparedID, &e.params, &e.customPayload) 1594} 1595 1596func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *queryParams, customPayload *map[string][]byte) error { 1597 if len(*customPayload) > 0 { 1598 f.payload() 1599 } 1600 f.writeHeader(f.flags, opExecute, streamID) 1601 f.writeCustomPayload(customPayload) 1602 f.writeShortBytes(preparedID) 1603 if f.proto > protoVersion1 { 1604 f.writeQueryParams(params) 1605 } else { 1606 n := len(params.values) 1607 f.writeShort(uint16(n)) 1608 for i := 0; i < n; i++ { 1609 if params.values[i].isUnset { 1610 f.writeUnset() 1611 } else { 1612 f.writeBytes(params.values[i].value) 1613 } 1614 } 1615 f.writeConsistency(params.consistency) 1616 } 1617 1618 return f.finishWrite() 1619} 1620 1621// TODO: can we replace BatchStatemt with batchStatement? As they prety much 1622// duplicate each other 1623type batchStatment struct { 1624 preparedID []byte 1625 statement string 1626 values []queryValues 1627} 1628 1629type writeBatchFrame struct { 1630 typ BatchType 1631 statements []batchStatment 1632 consistency Consistency 1633 1634 // v3+ 1635 serialConsistency SerialConsistency 1636 defaultTimestamp bool 1637 defaultTimestampValue int64 1638 1639 //v4+ 1640 customPayload map[string][]byte 1641} 1642 1643func (w *writeBatchFrame) writeFrame(framer *framer, streamID int) error { 1644 return framer.writeBatchFrame(streamID, w, w.customPayload) 1645} 1646 1647func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload map[string][]byte) error { 1648 if len(customPayload) > 0 { 1649 f.payload() 1650 } 1651 f.writeHeader(f.flags, opBatch, streamID) 1652 f.writeCustomPayload(&customPayload) 1653 f.writeByte(byte(w.typ)) 1654 1655 n := len(w.statements) 1656 f.writeShort(uint16(n)) 1657 1658 var flags byte 1659 1660 for i := 0; i < n; i++ { 1661 b := &w.statements[i] 1662 if len(b.preparedID) == 0 { 1663 f.writeByte(0) 1664 f.writeLongString(b.statement) 1665 } else { 1666 f.writeByte(1) 1667 f.writeShortBytes(b.preparedID) 1668 } 1669 1670 f.writeShort(uint16(len(b.values))) 1671 for j := range b.values { 1672 col := b.values[j] 1673 if f.proto > protoVersion2 && col.name != "" { 1674 // TODO: move this check into the caller and set a flag on writeBatchFrame 1675 // to indicate using named values 1676 if f.proto <= protoVersion5 { 1677 return fmt.Errorf("gocql: named query values are not supported in batches, please see https://issues.apache.org/jira/browse/CASSANDRA-10246") 1678 } 1679 flags |= flagWithNameValues 1680 f.writeString(col.name) 1681 } 1682 if col.isUnset { 1683 f.writeUnset() 1684 } else { 1685 f.writeBytes(col.value) 1686 } 1687 } 1688 } 1689 1690 f.writeConsistency(w.consistency) 1691 1692 if f.proto > protoVersion2 { 1693 if w.serialConsistency > 0 { 1694 flags |= flagWithSerialConsistency 1695 } 1696 if w.defaultTimestamp { 1697 flags |= flagDefaultTimestamp 1698 } 1699 1700 if f.proto > protoVersion4 { 1701 f.writeUint(uint32(flags)) 1702 } else { 1703 f.writeByte(flags) 1704 } 1705 1706 if w.serialConsistency > 0 { 1707 f.writeConsistency(Consistency(w.serialConsistency)) 1708 } 1709 1710 if w.defaultTimestamp { 1711 var ts int64 1712 if w.defaultTimestampValue != 0 { 1713 ts = w.defaultTimestampValue 1714 } else { 1715 ts = time.Now().UnixNano() / 1000 1716 } 1717 f.writeLong(ts) 1718 } 1719 } 1720 1721 return f.finishWrite() 1722} 1723 1724type writeOptionsFrame struct{} 1725 1726func (w *writeOptionsFrame) writeFrame(framer *framer, streamID int) error { 1727 return framer.writeOptionsFrame(streamID, w) 1728} 1729 1730func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) error { 1731 f.writeHeader(f.flags&^flagCompress, opOptions, stream) 1732 return f.finishWrite() 1733} 1734 1735type writeRegisterFrame struct { 1736 events []string 1737} 1738 1739func (w *writeRegisterFrame) writeFrame(framer *framer, streamID int) error { 1740 return framer.writeRegisterFrame(streamID, w) 1741} 1742 1743func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) error { 1744 f.writeHeader(f.flags, opRegister, streamID) 1745 f.writeStringList(w.events) 1746 1747 return f.finishWrite() 1748} 1749 1750func (f *framer) readByte() byte { 1751 if len(f.rbuf) < 1 { 1752 panic(fmt.Errorf("not enough bytes in buffer to read byte require 1 got: %d", len(f.rbuf))) 1753 } 1754 1755 b := f.rbuf[0] 1756 f.rbuf = f.rbuf[1:] 1757 return b 1758} 1759 1760func (f *framer) readInt() (n int) { 1761 if len(f.rbuf) < 4 { 1762 panic(fmt.Errorf("not enough bytes in buffer to read int require 4 got: %d", len(f.rbuf))) 1763 } 1764 1765 n = int(int32(f.rbuf[0])<<24 | int32(f.rbuf[1])<<16 | int32(f.rbuf[2])<<8 | int32(f.rbuf[3])) 1766 f.rbuf = f.rbuf[4:] 1767 return 1768} 1769 1770func (f *framer) readShort() (n uint16) { 1771 if len(f.rbuf) < 2 { 1772 panic(fmt.Errorf("not enough bytes in buffer to read short require 2 got: %d", len(f.rbuf))) 1773 } 1774 n = uint16(f.rbuf[0])<<8 | uint16(f.rbuf[1]) 1775 f.rbuf = f.rbuf[2:] 1776 return 1777} 1778 1779func (f *framer) readString() (s string) { 1780 size := f.readShort() 1781 1782 if len(f.rbuf) < int(size) { 1783 panic(fmt.Errorf("not enough bytes in buffer to read string require %d got: %d", size, len(f.rbuf))) 1784 } 1785 1786 s = string(f.rbuf[:size]) 1787 f.rbuf = f.rbuf[size:] 1788 return 1789} 1790 1791func (f *framer) readLongString() (s string) { 1792 size := f.readInt() 1793 1794 if len(f.rbuf) < size { 1795 panic(fmt.Errorf("not enough bytes in buffer to read long string require %d got: %d", size, len(f.rbuf))) 1796 } 1797 1798 s = string(f.rbuf[:size]) 1799 f.rbuf = f.rbuf[size:] 1800 return 1801} 1802 1803func (f *framer) readUUID() *UUID { 1804 if len(f.rbuf) < 16 { 1805 panic(fmt.Errorf("not enough bytes in buffer to read uuid require %d got: %d", 16, len(f.rbuf))) 1806 } 1807 1808 // TODO: how to handle this error, if it is a uuid, then sureley, problems? 1809 u, _ := UUIDFromBytes(f.rbuf[:16]) 1810 f.rbuf = f.rbuf[16:] 1811 return &u 1812} 1813 1814func (f *framer) readStringList() []string { 1815 size := f.readShort() 1816 1817 l := make([]string, size) 1818 for i := 0; i < int(size); i++ { 1819 l[i] = f.readString() 1820 } 1821 1822 return l 1823} 1824 1825func (f *framer) readBytesInternal() ([]byte, error) { 1826 size := f.readInt() 1827 if size < 0 { 1828 return nil, nil 1829 } 1830 1831 if len(f.rbuf) < size { 1832 return nil, fmt.Errorf("not enough bytes in buffer to read bytes require %d got: %d", size, len(f.rbuf)) 1833 } 1834 1835 l := f.rbuf[:size] 1836 f.rbuf = f.rbuf[size:] 1837 1838 return l, nil 1839} 1840 1841func (f *framer) readBytes() []byte { 1842 l, err := f.readBytesInternal() 1843 if err != nil { 1844 panic(err) 1845 } 1846 1847 return l 1848} 1849 1850func (f *framer) readShortBytes() []byte { 1851 size := f.readShort() 1852 if len(f.rbuf) < int(size) { 1853 panic(fmt.Errorf("not enough bytes in buffer to read short bytes: require %d got %d", size, len(f.rbuf))) 1854 } 1855 1856 l := f.rbuf[:size] 1857 f.rbuf = f.rbuf[size:] 1858 1859 return l 1860} 1861 1862func (f *framer) readInetAdressOnly() net.IP { 1863 if len(f.rbuf) < 1 { 1864 panic(fmt.Errorf("not enough bytes in buffer to read inet size require %d got: %d", 1, len(f.rbuf))) 1865 } 1866 1867 size := f.rbuf[0] 1868 f.rbuf = f.rbuf[1:] 1869 1870 if !(size == 4 || size == 16) { 1871 panic(fmt.Errorf("invalid IP size: %d", size)) 1872 } 1873 1874 if len(f.rbuf) < 1 { 1875 panic(fmt.Errorf("not enough bytes in buffer to read inet require %d got: %d", size, len(f.rbuf))) 1876 } 1877 1878 ip := make([]byte, size) 1879 copy(ip, f.rbuf[:size]) 1880 f.rbuf = f.rbuf[size:] 1881 return net.IP(ip) 1882} 1883 1884func (f *framer) readInet() (net.IP, int) { 1885 return f.readInetAdressOnly(), f.readInt() 1886} 1887 1888func (f *framer) readConsistency() Consistency { 1889 return Consistency(f.readShort()) 1890} 1891 1892func (f *framer) readBytesMap() map[string][]byte { 1893 size := f.readShort() 1894 m := make(map[string][]byte, size) 1895 1896 for i := 0; i < int(size); i++ { 1897 k := f.readString() 1898 v := f.readBytes() 1899 m[k] = v 1900 } 1901 1902 return m 1903} 1904 1905func (f *framer) readStringMultiMap() map[string][]string { 1906 size := f.readShort() 1907 m := make(map[string][]string, size) 1908 1909 for i := 0; i < int(size); i++ { 1910 k := f.readString() 1911 v := f.readStringList() 1912 m[k] = v 1913 } 1914 1915 return m 1916} 1917 1918func (f *framer) writeByte(b byte) { 1919 f.wbuf = append(f.wbuf, b) 1920} 1921 1922func appendBytes(p []byte, d []byte) []byte { 1923 if d == nil { 1924 return appendInt(p, -1) 1925 } 1926 p = appendInt(p, int32(len(d))) 1927 p = append(p, d...) 1928 return p 1929} 1930 1931func appendShort(p []byte, n uint16) []byte { 1932 return append(p, 1933 byte(n>>8), 1934 byte(n), 1935 ) 1936} 1937 1938func appendInt(p []byte, n int32) []byte { 1939 return append(p, byte(n>>24), 1940 byte(n>>16), 1941 byte(n>>8), 1942 byte(n)) 1943} 1944 1945func appendUint(p []byte, n uint32) []byte { 1946 return append(p, byte(n>>24), 1947 byte(n>>16), 1948 byte(n>>8), 1949 byte(n)) 1950} 1951 1952func appendLong(p []byte, n int64) []byte { 1953 return append(p, 1954 byte(n>>56), 1955 byte(n>>48), 1956 byte(n>>40), 1957 byte(n>>32), 1958 byte(n>>24), 1959 byte(n>>16), 1960 byte(n>>8), 1961 byte(n), 1962 ) 1963} 1964 1965func (f *framer) writeCustomPayload(customPayload *map[string][]byte) { 1966 if len(*customPayload) > 0 { 1967 if f.proto < protoVersion4 { 1968 panic("Custom payload is not supported with version V3 or less") 1969 } 1970 f.writeBytesMap(*customPayload) 1971 } 1972} 1973 1974// these are protocol level binary types 1975func (f *framer) writeInt(n int32) { 1976 f.wbuf = appendInt(f.wbuf, n) 1977} 1978 1979func (f *framer) writeUint(n uint32) { 1980 f.wbuf = appendUint(f.wbuf, n) 1981} 1982 1983func (f *framer) writeShort(n uint16) { 1984 f.wbuf = appendShort(f.wbuf, n) 1985} 1986 1987func (f *framer) writeLong(n int64) { 1988 f.wbuf = appendLong(f.wbuf, n) 1989} 1990 1991func (f *framer) writeString(s string) { 1992 f.writeShort(uint16(len(s))) 1993 f.wbuf = append(f.wbuf, s...) 1994} 1995 1996func (f *framer) writeLongString(s string) { 1997 f.writeInt(int32(len(s))) 1998 f.wbuf = append(f.wbuf, s...) 1999} 2000 2001func (f *framer) writeStringList(l []string) { 2002 f.writeShort(uint16(len(l))) 2003 for _, s := range l { 2004 f.writeString(s) 2005 } 2006} 2007 2008func (f *framer) writeUnset() { 2009 // Protocol version 4 specifies that bind variables do not require having a 2010 // value when executing a statement. Bind variables without a value are 2011 // called 'unset'. The 'unset' bind variable is serialized as the int 2012 // value '-2' without following bytes. 2013 f.writeInt(-2) 2014} 2015 2016func (f *framer) writeBytes(p []byte) { 2017 // TODO: handle null case correctly, 2018 // [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0, 2019 // no byte should follow and the value represented is `null`. 2020 if p == nil { 2021 f.writeInt(-1) 2022 } else { 2023 f.writeInt(int32(len(p))) 2024 f.wbuf = append(f.wbuf, p...) 2025 } 2026} 2027 2028func (f *framer) writeShortBytes(p []byte) { 2029 f.writeShort(uint16(len(p))) 2030 f.wbuf = append(f.wbuf, p...) 2031} 2032 2033func (f *framer) writeConsistency(cons Consistency) { 2034 f.writeShort(uint16(cons)) 2035} 2036 2037func (f *framer) writeStringMap(m map[string]string) { 2038 f.writeShort(uint16(len(m))) 2039 for k, v := range m { 2040 f.writeString(k) 2041 f.writeString(v) 2042 } 2043} 2044 2045func (f *framer) writeBytesMap(m map[string][]byte) { 2046 f.writeShort(uint16(len(m))) 2047 for k, v := range m { 2048 f.writeString(k) 2049 f.writeBytes(v) 2050 } 2051} 2052