1// Copyright (c) 2012 The gocql Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package gocql
6
7import (
8	"context"
9	"errors"
10	"fmt"
11	"io"
12	"io/ioutil"
13	"net"
14	"runtime"
15	"strings"
16	"time"
17)
18
19type unsetColumn struct{}
20
21// UnsetValue represents a value used in a query binding that will be ignored by Cassandra.
22//
23// By setting a field to the unset value Cassandra will ignore the write completely.
24// The main advantage is the ability to keep the same prepared statement even when you don't
25// want to update some fields, where before you needed to make another prepared statement.
26//
27// UnsetValue is only available when using the version 4 of the protocol.
28var UnsetValue = unsetColumn{}
29
30type namedValue struct {
31	name  string
32	value interface{}
33}
34
35// NamedValue produce a value which will bind to the named parameter in a query
36func NamedValue(name string, value interface{}) interface{} {
37	return &namedValue{
38		name:  name,
39		value: value,
40	}
41}
42
43const (
44	protoDirectionMask = 0x80
45	protoVersionMask   = 0x7F
46	protoVersion1      = 0x01
47	protoVersion2      = 0x02
48	protoVersion3      = 0x03
49	protoVersion4      = 0x04
50	protoVersion5      = 0x05
51
52	maxFrameSize = 256 * 1024 * 1024
53)
54
55type protoVersion byte
56
57func (p protoVersion) request() bool {
58	return p&protoDirectionMask == 0x00
59}
60
61func (p protoVersion) response() bool {
62	return p&protoDirectionMask == 0x80
63}
64
65func (p protoVersion) version() byte {
66	return byte(p) & protoVersionMask
67}
68
69func (p protoVersion) String() string {
70	dir := "REQ"
71	if p.response() {
72		dir = "RESP"
73	}
74
75	return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir)
76}
77
78type frameOp byte
79
80const (
81	// header ops
82	opError         frameOp = 0x00
83	opStartup       frameOp = 0x01
84	opReady         frameOp = 0x02
85	opAuthenticate  frameOp = 0x03
86	opOptions       frameOp = 0x05
87	opSupported     frameOp = 0x06
88	opQuery         frameOp = 0x07
89	opResult        frameOp = 0x08
90	opPrepare       frameOp = 0x09
91	opExecute       frameOp = 0x0A
92	opRegister      frameOp = 0x0B
93	opEvent         frameOp = 0x0C
94	opBatch         frameOp = 0x0D
95	opAuthChallenge frameOp = 0x0E
96	opAuthResponse  frameOp = 0x0F
97	opAuthSuccess   frameOp = 0x10
98)
99
100func (f frameOp) String() string {
101	switch f {
102	case opError:
103		return "ERROR"
104	case opStartup:
105		return "STARTUP"
106	case opReady:
107		return "READY"
108	case opAuthenticate:
109		return "AUTHENTICATE"
110	case opOptions:
111		return "OPTIONS"
112	case opSupported:
113		return "SUPPORTED"
114	case opQuery:
115		return "QUERY"
116	case opResult:
117		return "RESULT"
118	case opPrepare:
119		return "PREPARE"
120	case opExecute:
121		return "EXECUTE"
122	case opRegister:
123		return "REGISTER"
124	case opEvent:
125		return "EVENT"
126	case opBatch:
127		return "BATCH"
128	case opAuthChallenge:
129		return "AUTH_CHALLENGE"
130	case opAuthResponse:
131		return "AUTH_RESPONSE"
132	case opAuthSuccess:
133		return "AUTH_SUCCESS"
134	default:
135		return fmt.Sprintf("UNKNOWN_OP_%d", f)
136	}
137}
138
139const (
140	// result kind
141	resultKindVoid          = 1
142	resultKindRows          = 2
143	resultKindKeyspace      = 3
144	resultKindPrepared      = 4
145	resultKindSchemaChanged = 5
146
147	// rows flags
148	flagGlobalTableSpec int = 0x01
149	flagHasMorePages    int = 0x02
150	flagNoMetaData      int = 0x04
151
152	// query flags
153	flagValues                byte = 0x01
154	flagSkipMetaData          byte = 0x02
155	flagPageSize              byte = 0x04
156	flagWithPagingState       byte = 0x08
157	flagWithSerialConsistency byte = 0x10
158	flagDefaultTimestamp      byte = 0x20
159	flagWithNameValues        byte = 0x40
160	flagWithKeyspace          byte = 0x80
161
162	// prepare flags
163	flagWithPreparedKeyspace uint32 = 0x01
164
165	// header flags
166	flagCompress      byte = 0x01
167	flagTracing       byte = 0x02
168	flagCustomPayload byte = 0x04
169	flagWarning       byte = 0x08
170	flagBetaProtocol  byte = 0x10
171)
172
173type Consistency uint16
174
175const (
176	Any         Consistency = 0x00
177	One         Consistency = 0x01
178	Two         Consistency = 0x02
179	Three       Consistency = 0x03
180	Quorum      Consistency = 0x04
181	All         Consistency = 0x05
182	LocalQuorum Consistency = 0x06
183	EachQuorum  Consistency = 0x07
184	LocalOne    Consistency = 0x0A
185)
186
187func (c Consistency) String() string {
188	switch c {
189	case Any:
190		return "ANY"
191	case One:
192		return "ONE"
193	case Two:
194		return "TWO"
195	case Three:
196		return "THREE"
197	case Quorum:
198		return "QUORUM"
199	case All:
200		return "ALL"
201	case LocalQuorum:
202		return "LOCAL_QUORUM"
203	case EachQuorum:
204		return "EACH_QUORUM"
205	case LocalOne:
206		return "LOCAL_ONE"
207	default:
208		return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
209	}
210}
211
212func (c Consistency) MarshalText() (text []byte, err error) {
213	return []byte(c.String()), nil
214}
215
216func (c *Consistency) UnmarshalText(text []byte) error {
217	switch string(text) {
218	case "ANY":
219		*c = Any
220	case "ONE":
221		*c = One
222	case "TWO":
223		*c = Two
224	case "THREE":
225		*c = Three
226	case "QUORUM":
227		*c = Quorum
228	case "ALL":
229		*c = All
230	case "LOCAL_QUORUM":
231		*c = LocalQuorum
232	case "EACH_QUORUM":
233		*c = EachQuorum
234	case "LOCAL_ONE":
235		*c = LocalOne
236	default:
237		return fmt.Errorf("invalid consistency %q", string(text))
238	}
239
240	return nil
241}
242
243func ParseConsistency(s string) Consistency {
244	var c Consistency
245	if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil {
246		panic(err)
247	}
248	return c
249}
250
251// ParseConsistencyWrapper wraps gocql.ParseConsistency to provide an err
252// return instead of a panic
253func ParseConsistencyWrapper(s string) (consistency Consistency, err error) {
254	err = consistency.UnmarshalText([]byte(strings.ToUpper(s)))
255	return
256}
257
258// MustParseConsistency is the same as ParseConsistency except it returns
259// an error (never). It is kept here since breaking changes are not good.
260// DEPRECATED: use ParseConsistency if you want a panic on parse error.
261func MustParseConsistency(s string) (Consistency, error) {
262	c, err := ParseConsistencyWrapper(s)
263	if err != nil {
264		panic(err)
265	}
266	return c, nil
267}
268
269type SerialConsistency uint16
270
271const (
272	Serial      SerialConsistency = 0x08
273	LocalSerial SerialConsistency = 0x09
274)
275
276func (s SerialConsistency) String() string {
277	switch s {
278	case Serial:
279		return "SERIAL"
280	case LocalSerial:
281		return "LOCAL_SERIAL"
282	default:
283		return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s))
284	}
285}
286
287func (s SerialConsistency) MarshalText() (text []byte, err error) {
288	return []byte(s.String()), nil
289}
290
291func (s *SerialConsistency) UnmarshalText(text []byte) error {
292	switch string(text) {
293	case "SERIAL":
294		*s = Serial
295	case "LOCAL_SERIAL":
296		*s = LocalSerial
297	default:
298		return fmt.Errorf("invalid consistency %q", string(text))
299	}
300
301	return nil
302}
303
304const (
305	apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
306)
307
308var (
309	ErrFrameTooBig = errors.New("frame length is bigger than the maximum allowed")
310)
311
312const maxFrameHeaderSize = 9
313
314func readInt(p []byte) int32 {
315	return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3])
316}
317
318type frameHeader struct {
319	version  protoVersion
320	flags    byte
321	stream   int
322	op       frameOp
323	length   int
324	warnings []string
325}
326
327func (f frameHeader) String() string {
328	return fmt.Sprintf("[header version=%s flags=0x%x stream=%d op=%s length=%d]", f.version, f.flags, f.stream, f.op, f.length)
329}
330
331func (f frameHeader) Header() frameHeader {
332	return f
333}
334
335const defaultBufSize = 128
336
337type ObservedFrameHeader struct {
338	Version protoVersion
339	Flags   byte
340	Stream  int16
341	Opcode  frameOp
342	Length  int32
343
344	// StartHeader is the time we started reading the frame header off the network connection.
345	Start time.Time
346	// EndHeader is the time we finished reading the frame header off the network connection.
347	End time.Time
348
349	// Host is Host of the connection the frame header was read from.
350	Host *HostInfo
351}
352
353func (f ObservedFrameHeader) String() string {
354	return fmt.Sprintf("[observed header version=%s flags=0x%x stream=%d op=%s length=%d]", f.Version, f.Flags, f.Stream, f.Opcode, f.Length)
355}
356
357// FrameHeaderObserver is the interface implemented by frame observers / stat collectors.
358//
359// Experimental, this interface and use may change
360type FrameHeaderObserver interface {
361	// ObserveFrameHeader gets called on every received frame header.
362	ObserveFrameHeader(context.Context, ObservedFrameHeader)
363}
364
365// a framer is responsible for reading, writing and parsing frames on a single stream
366type framer struct {
367	r io.Reader
368	w io.Writer
369
370	proto byte
371	// flags are for outgoing flags, enabling compression and tracing etc
372	flags    byte
373	compres  Compressor
374	headSize int
375	// if this frame was read then the header will be here
376	header *frameHeader
377
378	// if tracing flag is set this is not nil
379	traceID []byte
380
381	// holds a ref to the whole byte slice for rbuf so that it can be reset to
382	// 0 after a read.
383	readBuffer []byte
384
385	rbuf []byte
386	wbuf []byte
387
388	customPayload map[string][]byte
389}
390
391func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *framer {
392	f := &framer{
393		wbuf:       make([]byte, defaultBufSize),
394		readBuffer: make([]byte, defaultBufSize),
395	}
396	var flags byte
397	if compressor != nil {
398		flags |= flagCompress
399	}
400	if version == protoVersion5 {
401		flags |= flagBetaProtocol
402	}
403
404	version &= protoVersionMask
405
406	headSize := 8
407	if version > protoVersion2 {
408		headSize = 9
409	}
410
411	f.compres = compressor
412	f.proto = version
413	f.flags = flags
414	f.headSize = headSize
415
416	f.r = r
417	f.rbuf = f.readBuffer[:0]
418
419	f.w = w
420	f.wbuf = f.wbuf[:0]
421
422	f.header = nil
423	f.traceID = nil
424
425	return f
426}
427
428type frame interface {
429	Header() frameHeader
430}
431
432func readHeader(r io.Reader, p []byte) (head frameHeader, err error) {
433	_, err = io.ReadFull(r, p[:1])
434	if err != nil {
435		return frameHeader{}, err
436	}
437
438	version := p[0] & protoVersionMask
439
440	if version < protoVersion1 || version > protoVersion5 {
441		return frameHeader{}, fmt.Errorf("gocql: unsupported protocol response version: %d", version)
442	}
443
444	headSize := 9
445	if version < protoVersion3 {
446		headSize = 8
447	}
448
449	_, err = io.ReadFull(r, p[1:headSize])
450	if err != nil {
451		return frameHeader{}, err
452	}
453
454	p = p[:headSize]
455
456	head.version = protoVersion(p[0])
457	head.flags = p[1]
458
459	if version > protoVersion2 {
460		if len(p) != 9 {
461			return frameHeader{}, fmt.Errorf("not enough bytes to read header require 9 got: %d", len(p))
462		}
463
464		head.stream = int(int16(p[2])<<8 | int16(p[3]))
465		head.op = frameOp(p[4])
466		head.length = int(readInt(p[5:]))
467	} else {
468		if len(p) != 8 {
469			return frameHeader{}, fmt.Errorf("not enough bytes to read header require 8 got: %d", len(p))
470		}
471
472		head.stream = int(int8(p[2]))
473		head.op = frameOp(p[3])
474		head.length = int(readInt(p[4:]))
475	}
476
477	return head, nil
478}
479
480// explicitly enables tracing for the framers outgoing requests
481func (f *framer) trace() {
482	f.flags |= flagTracing
483}
484
485// explicitly enables the custom payload flag
486func (f *framer) payload() {
487	f.flags |= flagCustomPayload
488}
489
490// reads a frame form the wire into the framers buffer
491func (f *framer) readFrame(head *frameHeader) error {
492	if head.length < 0 {
493		return fmt.Errorf("frame body length can not be less than 0: %d", head.length)
494	} else if head.length > maxFrameSize {
495		// need to free up the connection to be used again
496		_, err := io.CopyN(ioutil.Discard, f.r, int64(head.length))
497		if err != nil {
498			return fmt.Errorf("error whilst trying to discard frame with invalid length: %v", err)
499		}
500		return ErrFrameTooBig
501	}
502
503	if cap(f.readBuffer) >= head.length {
504		f.rbuf = f.readBuffer[:head.length]
505	} else {
506		f.readBuffer = make([]byte, head.length)
507		f.rbuf = f.readBuffer
508	}
509
510	// assume the underlying reader takes care of timeouts and retries
511	n, err := io.ReadFull(f.r, f.rbuf)
512	if err != nil {
513		return fmt.Errorf("unable to read frame body: read %d/%d bytes: %v", n, head.length, err)
514	}
515
516	if head.flags&flagCompress == flagCompress {
517		if f.compres == nil {
518			return NewErrProtocol("no compressor available with compressed frame body")
519		}
520
521		f.rbuf, err = f.compres.Decode(f.rbuf)
522		if err != nil {
523			return err
524		}
525	}
526
527	f.header = head
528	return nil
529}
530
531func (f *framer) parseFrame() (frame frame, err error) {
532	defer func() {
533		if r := recover(); r != nil {
534			if _, ok := r.(runtime.Error); ok {
535				panic(r)
536			}
537			err = r.(error)
538		}
539	}()
540
541	if f.header.version.request() {
542		return nil, NewErrProtocol("got a request frame from server: %v", f.header.version)
543	}
544
545	if f.header.flags&flagTracing == flagTracing {
546		f.readTrace()
547	}
548
549	if f.header.flags&flagWarning == flagWarning {
550		f.header.warnings = f.readStringList()
551	}
552
553	if f.header.flags&flagCustomPayload == flagCustomPayload {
554		f.customPayload = f.readBytesMap()
555	}
556
557	// assumes that the frame body has been read into rbuf
558	switch f.header.op {
559	case opError:
560		frame = f.parseErrorFrame()
561	case opReady:
562		frame = f.parseReadyFrame()
563	case opResult:
564		frame, err = f.parseResultFrame()
565	case opSupported:
566		frame = f.parseSupportedFrame()
567	case opAuthenticate:
568		frame = f.parseAuthenticateFrame()
569	case opAuthChallenge:
570		frame = f.parseAuthChallengeFrame()
571	case opAuthSuccess:
572		frame = f.parseAuthSuccessFrame()
573	case opEvent:
574		frame = f.parseEventFrame()
575	default:
576		return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
577	}
578
579	return
580}
581
582func (f *framer) parseErrorFrame() frame {
583	code := f.readInt()
584	msg := f.readString()
585
586	errD := errorFrame{
587		frameHeader: *f.header,
588		code:        code,
589		message:     msg,
590	}
591
592	switch code {
593	case errUnavailable:
594		cl := f.readConsistency()
595		required := f.readInt()
596		alive := f.readInt()
597		return &RequestErrUnavailable{
598			errorFrame:  errD,
599			Consistency: cl,
600			Required:    required,
601			Alive:       alive,
602		}
603	case errWriteTimeout:
604		cl := f.readConsistency()
605		received := f.readInt()
606		blockfor := f.readInt()
607		writeType := f.readString()
608		return &RequestErrWriteTimeout{
609			errorFrame:  errD,
610			Consistency: cl,
611			Received:    received,
612			BlockFor:    blockfor,
613			WriteType:   writeType,
614		}
615	case errReadTimeout:
616		cl := f.readConsistency()
617		received := f.readInt()
618		blockfor := f.readInt()
619		dataPresent := f.readByte()
620		return &RequestErrReadTimeout{
621			errorFrame:  errD,
622			Consistency: cl,
623			Received:    received,
624			BlockFor:    blockfor,
625			DataPresent: dataPresent,
626		}
627	case errAlreadyExists:
628		ks := f.readString()
629		table := f.readString()
630		return &RequestErrAlreadyExists{
631			errorFrame: errD,
632			Keyspace:   ks,
633			Table:      table,
634		}
635	case errUnprepared:
636		stmtId := f.readShortBytes()
637		return &RequestErrUnprepared{
638			errorFrame:  errD,
639			StatementId: copyBytes(stmtId), // defensively copy
640		}
641	case errReadFailure:
642		res := &RequestErrReadFailure{
643			errorFrame: errD,
644		}
645		res.Consistency = f.readConsistency()
646		res.Received = f.readInt()
647		res.BlockFor = f.readInt()
648		if f.proto > protoVersion4 {
649			res.ErrorMap = f.readErrorMap()
650			res.NumFailures = len(res.ErrorMap)
651		} else {
652			res.NumFailures = f.readInt()
653		}
654		res.DataPresent = f.readByte() != 0
655
656		return res
657	case errWriteFailure:
658		res := &RequestErrWriteFailure{
659			errorFrame: errD,
660		}
661		res.Consistency = f.readConsistency()
662		res.Received = f.readInt()
663		res.BlockFor = f.readInt()
664		if f.proto > protoVersion4 {
665			res.ErrorMap = f.readErrorMap()
666			res.NumFailures = len(res.ErrorMap)
667		} else {
668			res.NumFailures = f.readInt()
669		}
670		res.WriteType = f.readString()
671		return res
672	case errFunctionFailure:
673		res := &RequestErrFunctionFailure{
674			errorFrame: errD,
675		}
676		res.Keyspace = f.readString()
677		res.Function = f.readString()
678		res.ArgTypes = f.readStringList()
679		return res
680
681	case errCDCWriteFailure:
682		res := &RequestErrCDCWriteFailure{
683			errorFrame: errD,
684		}
685		return res
686
687	case errInvalid, errBootstrapping, errConfig, errCredentials, errOverloaded,
688		errProtocol, errServer, errSyntax, errTruncate, errUnauthorized:
689		// TODO(zariel): we should have some distinct types for these errors
690		return errD
691	default:
692		panic(fmt.Errorf("unknown error code: 0x%x", errD.code))
693	}
694}
695
696func (f *framer) readErrorMap() (errMap ErrorMap) {
697	errMap = make(ErrorMap)
698	numErrs := f.readInt()
699	for i := 0; i < numErrs; i++ {
700		ip := f.readInetAdressOnly().String()
701		errMap[ip] = f.readShort()
702	}
703	return
704}
705
706func (f *framer) writeHeader(flags byte, op frameOp, stream int) {
707	f.wbuf = f.wbuf[:0]
708	f.wbuf = append(f.wbuf,
709		f.proto,
710		flags,
711	)
712
713	if f.proto > protoVersion2 {
714		f.wbuf = append(f.wbuf,
715			byte(stream>>8),
716			byte(stream),
717		)
718	} else {
719		f.wbuf = append(f.wbuf,
720			byte(stream),
721		)
722	}
723
724	// pad out length
725	f.wbuf = append(f.wbuf,
726		byte(op),
727		0,
728		0,
729		0,
730		0,
731	)
732}
733
734func (f *framer) setLength(length int) {
735	p := 4
736	if f.proto > protoVersion2 {
737		p = 5
738	}
739
740	f.wbuf[p+0] = byte(length >> 24)
741	f.wbuf[p+1] = byte(length >> 16)
742	f.wbuf[p+2] = byte(length >> 8)
743	f.wbuf[p+3] = byte(length)
744}
745
746func (f *framer) finishWrite() error {
747	if len(f.wbuf) > maxFrameSize {
748		// huge app frame, lets remove it so it doesn't bloat the heap
749		f.wbuf = make([]byte, defaultBufSize)
750		return ErrFrameTooBig
751	}
752
753	if f.wbuf[1]&flagCompress == flagCompress {
754		if f.compres == nil {
755			panic("compress flag set with no compressor")
756		}
757
758		// TODO: only compress frames which are big enough
759		compressed, err := f.compres.Encode(f.wbuf[f.headSize:])
760		if err != nil {
761			return err
762		}
763
764		f.wbuf = append(f.wbuf[:f.headSize], compressed...)
765	}
766	length := len(f.wbuf) - f.headSize
767	f.setLength(length)
768
769	_, err := f.w.Write(f.wbuf)
770	if err != nil {
771		return err
772	}
773
774	return nil
775}
776
777func (f *framer) readTrace() {
778	f.traceID = f.readUUID().Bytes()
779}
780
781type readyFrame struct {
782	frameHeader
783}
784
785func (f *framer) parseReadyFrame() frame {
786	return &readyFrame{
787		frameHeader: *f.header,
788	}
789}
790
791type supportedFrame struct {
792	frameHeader
793
794	supported map[string][]string
795}
796
797// TODO: if we move the body buffer onto the frameHeader then we only need a single
798// framer, and can move the methods onto the header.
799func (f *framer) parseSupportedFrame() frame {
800	return &supportedFrame{
801		frameHeader: *f.header,
802
803		supported: f.readStringMultiMap(),
804	}
805}
806
807type writeStartupFrame struct {
808	opts map[string]string
809}
810
811func (w writeStartupFrame) String() string {
812	return fmt.Sprintf("[startup opts=%+v]", w.opts)
813}
814
815func (w *writeStartupFrame) writeFrame(f *framer, streamID int) error {
816	f.writeHeader(f.flags&^flagCompress, opStartup, streamID)
817	f.writeStringMap(w.opts)
818
819	return f.finishWrite()
820}
821
822type writePrepareFrame struct {
823	statement     string
824	keyspace      string
825	customPayload map[string][]byte
826}
827
828func (w *writePrepareFrame) writeFrame(f *framer, streamID int) error {
829	if len(w.customPayload) > 0 {
830		f.payload()
831	}
832	f.writeHeader(f.flags, opPrepare, streamID)
833	f.writeCustomPayload(&w.customPayload)
834	f.writeLongString(w.statement)
835
836	var flags uint32 = 0
837	if w.keyspace != "" {
838		if f.proto > protoVersion4 {
839			flags |= flagWithPreparedKeyspace
840		} else {
841			panic(fmt.Errorf("the keyspace can only be set with protocol 5 or higher"))
842		}
843	}
844	if f.proto > protoVersion4 {
845		f.writeUint(flags)
846	}
847	if w.keyspace != "" {
848		f.writeString(w.keyspace)
849	}
850
851	return f.finishWrite()
852}
853
854func (f *framer) readTypeInfo() TypeInfo {
855	// TODO: factor this out so the same code paths can be used to parse custom
856	// types and other types, as much of the logic will be duplicated.
857	id := f.readShort()
858
859	simple := NativeType{
860		proto: f.proto,
861		typ:   Type(id),
862	}
863
864	if simple.typ == TypeCustom {
865		simple.custom = f.readString()
866		if cassType := getApacheCassandraType(simple.custom); cassType != TypeCustom {
867			simple.typ = cassType
868		}
869	}
870
871	switch simple.typ {
872	case TypeTuple:
873		n := f.readShort()
874		tuple := TupleTypeInfo{
875			NativeType: simple,
876			Elems:      make([]TypeInfo, n),
877		}
878
879		for i := 0; i < int(n); i++ {
880			tuple.Elems[i] = f.readTypeInfo()
881		}
882
883		return tuple
884
885	case TypeUDT:
886		udt := UDTTypeInfo{
887			NativeType: simple,
888		}
889		udt.KeySpace = f.readString()
890		udt.Name = f.readString()
891
892		n := f.readShort()
893		udt.Elements = make([]UDTField, n)
894		for i := 0; i < int(n); i++ {
895			field := &udt.Elements[i]
896			field.Name = f.readString()
897			field.Type = f.readTypeInfo()
898		}
899
900		return udt
901	case TypeMap, TypeList, TypeSet:
902		collection := CollectionType{
903			NativeType: simple,
904		}
905
906		if simple.typ == TypeMap {
907			collection.Key = f.readTypeInfo()
908		}
909
910		collection.Elem = f.readTypeInfo()
911
912		return collection
913	}
914
915	return simple
916}
917
918type preparedMetadata struct {
919	resultMetadata
920
921	// proto v4+
922	pkeyColumns []int
923}
924
925func (r preparedMetadata) String() string {
926	return fmt.Sprintf("[prepared flags=0x%x pkey=%v paging_state=% X columns=%v col_count=%d actual_col_count=%d]", r.flags, r.pkeyColumns, r.pagingState, r.columns, r.colCount, r.actualColCount)
927}
928
929func (f *framer) parsePreparedMetadata() preparedMetadata {
930	// TODO: deduplicate this from parseMetadata
931	meta := preparedMetadata{}
932
933	meta.flags = f.readInt()
934	meta.colCount = f.readInt()
935	if meta.colCount < 0 {
936		panic(fmt.Errorf("received negative column count: %d", meta.colCount))
937	}
938	meta.actualColCount = meta.colCount
939
940	if f.proto >= protoVersion4 {
941		pkeyCount := f.readInt()
942		pkeys := make([]int, pkeyCount)
943		for i := 0; i < pkeyCount; i++ {
944			pkeys[i] = int(f.readShort())
945		}
946		meta.pkeyColumns = pkeys
947	}
948
949	if meta.flags&flagHasMorePages == flagHasMorePages {
950		meta.pagingState = copyBytes(f.readBytes())
951	}
952
953	if meta.flags&flagNoMetaData == flagNoMetaData {
954		return meta
955	}
956
957	var keyspace, table string
958	globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec
959	if globalSpec {
960		keyspace = f.readString()
961		table = f.readString()
962	}
963
964	var cols []ColumnInfo
965	if meta.colCount < 1000 {
966		// preallocate columninfo to avoid excess copying
967		cols = make([]ColumnInfo, meta.colCount)
968		for i := 0; i < meta.colCount; i++ {
969			f.readCol(&cols[i], &meta.resultMetadata, globalSpec, keyspace, table)
970		}
971	} else {
972		// use append, huge number of columns usually indicates a corrupt frame or
973		// just a huge row.
974		for i := 0; i < meta.colCount; i++ {
975			var col ColumnInfo
976			f.readCol(&col, &meta.resultMetadata, globalSpec, keyspace, table)
977			cols = append(cols, col)
978		}
979	}
980
981	meta.columns = cols
982
983	return meta
984}
985
986type resultMetadata struct {
987	flags int
988
989	// only if flagPageState
990	pagingState []byte
991
992	columns  []ColumnInfo
993	colCount int
994
995	// this is a count of the total number of columns which can be scanned,
996	// it is at minimum len(columns) but may be larger, for instance when a column
997	// is a UDT or tuple.
998	actualColCount int
999}
1000
1001func (r *resultMetadata) morePages() bool {
1002	return r.flags&flagHasMorePages == flagHasMorePages
1003}
1004
1005func (r resultMetadata) String() string {
1006	return fmt.Sprintf("[metadata flags=0x%x paging_state=% X columns=%v]", r.flags, r.pagingState, r.columns)
1007}
1008
1009func (f *framer) readCol(col *ColumnInfo, meta *resultMetadata, globalSpec bool, keyspace, table string) {
1010	if !globalSpec {
1011		col.Keyspace = f.readString()
1012		col.Table = f.readString()
1013	} else {
1014		col.Keyspace = keyspace
1015		col.Table = table
1016	}
1017
1018	col.Name = f.readString()
1019	col.TypeInfo = f.readTypeInfo()
1020	switch v := col.TypeInfo.(type) {
1021	// maybe also UDT
1022	case TupleTypeInfo:
1023		// -1 because we already included the tuple column
1024		meta.actualColCount += len(v.Elems) - 1
1025	}
1026}
1027
1028func (f *framer) parseResultMetadata() resultMetadata {
1029	var meta resultMetadata
1030
1031	meta.flags = f.readInt()
1032	meta.colCount = f.readInt()
1033	if meta.colCount < 0 {
1034		panic(fmt.Errorf("received negative column count: %d", meta.colCount))
1035	}
1036	meta.actualColCount = meta.colCount
1037
1038	if meta.flags&flagHasMorePages == flagHasMorePages {
1039		meta.pagingState = copyBytes(f.readBytes())
1040	}
1041
1042	if meta.flags&flagNoMetaData == flagNoMetaData {
1043		return meta
1044	}
1045
1046	var keyspace, table string
1047	globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec
1048	if globalSpec {
1049		keyspace = f.readString()
1050		table = f.readString()
1051	}
1052
1053	var cols []ColumnInfo
1054	if meta.colCount < 1000 {
1055		// preallocate columninfo to avoid excess copying
1056		cols = make([]ColumnInfo, meta.colCount)
1057		for i := 0; i < meta.colCount; i++ {
1058			f.readCol(&cols[i], &meta, globalSpec, keyspace, table)
1059		}
1060
1061	} else {
1062		// use append, huge number of columns usually indicates a corrupt frame or
1063		// just a huge row.
1064		for i := 0; i < meta.colCount; i++ {
1065			var col ColumnInfo
1066			f.readCol(&col, &meta, globalSpec, keyspace, table)
1067			cols = append(cols, col)
1068		}
1069	}
1070
1071	meta.columns = cols
1072
1073	return meta
1074}
1075
1076type resultVoidFrame struct {
1077	frameHeader
1078}
1079
1080func (f *resultVoidFrame) String() string {
1081	return "[result_void]"
1082}
1083
1084func (f *framer) parseResultFrame() (frame, error) {
1085	kind := f.readInt()
1086
1087	switch kind {
1088	case resultKindVoid:
1089		return &resultVoidFrame{frameHeader: *f.header}, nil
1090	case resultKindRows:
1091		return f.parseResultRows(), nil
1092	case resultKindKeyspace:
1093		return f.parseResultSetKeyspace(), nil
1094	case resultKindPrepared:
1095		return f.parseResultPrepared(), nil
1096	case resultKindSchemaChanged:
1097		return f.parseResultSchemaChange(), nil
1098	}
1099
1100	return nil, NewErrProtocol("unknown result kind: %x", kind)
1101}
1102
1103type resultRowsFrame struct {
1104	frameHeader
1105
1106	meta resultMetadata
1107	// dont parse the rows here as we only need to do it once
1108	numRows int
1109}
1110
1111func (f *resultRowsFrame) String() string {
1112	return fmt.Sprintf("[result_rows meta=%v]", f.meta)
1113}
1114
1115func (f *framer) parseResultRows() frame {
1116	result := &resultRowsFrame{}
1117	result.meta = f.parseResultMetadata()
1118
1119	result.numRows = f.readInt()
1120	if result.numRows < 0 {
1121		panic(fmt.Errorf("invalid row_count in result frame: %d", result.numRows))
1122	}
1123
1124	return result
1125}
1126
1127type resultKeyspaceFrame struct {
1128	frameHeader
1129	keyspace string
1130}
1131
1132func (r *resultKeyspaceFrame) String() string {
1133	return fmt.Sprintf("[result_keyspace keyspace=%s]", r.keyspace)
1134}
1135
1136func (f *framer) parseResultSetKeyspace() frame {
1137	return &resultKeyspaceFrame{
1138		frameHeader: *f.header,
1139		keyspace:    f.readString(),
1140	}
1141}
1142
1143type resultPreparedFrame struct {
1144	frameHeader
1145
1146	preparedID []byte
1147	reqMeta    preparedMetadata
1148	respMeta   resultMetadata
1149}
1150
1151func (f *framer) parseResultPrepared() frame {
1152	frame := &resultPreparedFrame{
1153		frameHeader: *f.header,
1154		preparedID:  f.readShortBytes(),
1155		reqMeta:     f.parsePreparedMetadata(),
1156	}
1157
1158	if f.proto < protoVersion2 {
1159		return frame
1160	}
1161
1162	frame.respMeta = f.parseResultMetadata()
1163
1164	return frame
1165}
1166
1167type schemaChangeKeyspace struct {
1168	frameHeader
1169
1170	change   string
1171	keyspace string
1172}
1173
1174func (f schemaChangeKeyspace) String() string {
1175	return fmt.Sprintf("[event schema_change_keyspace change=%q keyspace=%q]", f.change, f.keyspace)
1176}
1177
1178type schemaChangeTable struct {
1179	frameHeader
1180
1181	change   string
1182	keyspace string
1183	object   string
1184}
1185
1186func (f schemaChangeTable) String() string {
1187	return fmt.Sprintf("[event schema_change change=%q keyspace=%q object=%q]", f.change, f.keyspace, f.object)
1188}
1189
1190type schemaChangeType struct {
1191	frameHeader
1192
1193	change   string
1194	keyspace string
1195	object   string
1196}
1197
1198type schemaChangeFunction struct {
1199	frameHeader
1200
1201	change   string
1202	keyspace string
1203	name     string
1204	args     []string
1205}
1206
1207type schemaChangeAggregate struct {
1208	frameHeader
1209
1210	change   string
1211	keyspace string
1212	name     string
1213	args     []string
1214}
1215
1216func (f *framer) parseResultSchemaChange() frame {
1217	if f.proto <= protoVersion2 {
1218		change := f.readString()
1219		keyspace := f.readString()
1220		table := f.readString()
1221
1222		if table != "" {
1223			return &schemaChangeTable{
1224				frameHeader: *f.header,
1225				change:      change,
1226				keyspace:    keyspace,
1227				object:      table,
1228			}
1229		} else {
1230			return &schemaChangeKeyspace{
1231				frameHeader: *f.header,
1232				change:      change,
1233				keyspace:    keyspace,
1234			}
1235		}
1236	} else {
1237		change := f.readString()
1238		target := f.readString()
1239
1240		// TODO: could just use a separate type for each target
1241		switch target {
1242		case "KEYSPACE":
1243			frame := &schemaChangeKeyspace{
1244				frameHeader: *f.header,
1245				change:      change,
1246			}
1247
1248			frame.keyspace = f.readString()
1249
1250			return frame
1251		case "TABLE":
1252			frame := &schemaChangeTable{
1253				frameHeader: *f.header,
1254				change:      change,
1255			}
1256
1257			frame.keyspace = f.readString()
1258			frame.object = f.readString()
1259
1260			return frame
1261		case "TYPE":
1262			frame := &schemaChangeType{
1263				frameHeader: *f.header,
1264				change:      change,
1265			}
1266
1267			frame.keyspace = f.readString()
1268			frame.object = f.readString()
1269
1270			return frame
1271		case "FUNCTION":
1272			frame := &schemaChangeFunction{
1273				frameHeader: *f.header,
1274				change:      change,
1275			}
1276
1277			frame.keyspace = f.readString()
1278			frame.name = f.readString()
1279			frame.args = f.readStringList()
1280
1281			return frame
1282		case "AGGREGATE":
1283			frame := &schemaChangeAggregate{
1284				frameHeader: *f.header,
1285				change:      change,
1286			}
1287
1288			frame.keyspace = f.readString()
1289			frame.name = f.readString()
1290			frame.args = f.readStringList()
1291
1292			return frame
1293		default:
1294			panic(fmt.Errorf("gocql: unknown SCHEMA_CHANGE target: %q change: %q", target, change))
1295		}
1296	}
1297
1298}
1299
1300type authenticateFrame struct {
1301	frameHeader
1302
1303	class string
1304}
1305
1306func (a *authenticateFrame) String() string {
1307	return fmt.Sprintf("[authenticate class=%q]", a.class)
1308}
1309
1310func (f *framer) parseAuthenticateFrame() frame {
1311	return &authenticateFrame{
1312		frameHeader: *f.header,
1313		class:       f.readString(),
1314	}
1315}
1316
1317type authSuccessFrame struct {
1318	frameHeader
1319
1320	data []byte
1321}
1322
1323func (a *authSuccessFrame) String() string {
1324	return fmt.Sprintf("[auth_success data=%q]", a.data)
1325}
1326
1327func (f *framer) parseAuthSuccessFrame() frame {
1328	return &authSuccessFrame{
1329		frameHeader: *f.header,
1330		data:        f.readBytes(),
1331	}
1332}
1333
1334type authChallengeFrame struct {
1335	frameHeader
1336
1337	data []byte
1338}
1339
1340func (a *authChallengeFrame) String() string {
1341	return fmt.Sprintf("[auth_challenge data=%q]", a.data)
1342}
1343
1344func (f *framer) parseAuthChallengeFrame() frame {
1345	return &authChallengeFrame{
1346		frameHeader: *f.header,
1347		data:        f.readBytes(),
1348	}
1349}
1350
1351type statusChangeEventFrame struct {
1352	frameHeader
1353
1354	change string
1355	host   net.IP
1356	port   int
1357}
1358
1359func (t statusChangeEventFrame) String() string {
1360	return fmt.Sprintf("[status_change change=%s host=%v port=%v]", t.change, t.host, t.port)
1361}
1362
1363// essentially the same as statusChange
1364type topologyChangeEventFrame struct {
1365	frameHeader
1366
1367	change string
1368	host   net.IP
1369	port   int
1370}
1371
1372func (t topologyChangeEventFrame) String() string {
1373	return fmt.Sprintf("[topology_change change=%s host=%v port=%v]", t.change, t.host, t.port)
1374}
1375
1376func (f *framer) parseEventFrame() frame {
1377	eventType := f.readString()
1378
1379	switch eventType {
1380	case "TOPOLOGY_CHANGE":
1381		frame := &topologyChangeEventFrame{frameHeader: *f.header}
1382		frame.change = f.readString()
1383		frame.host, frame.port = f.readInet()
1384
1385		return frame
1386	case "STATUS_CHANGE":
1387		frame := &statusChangeEventFrame{frameHeader: *f.header}
1388		frame.change = f.readString()
1389		frame.host, frame.port = f.readInet()
1390
1391		return frame
1392	case "SCHEMA_CHANGE":
1393		// this should work for all versions
1394		return f.parseResultSchemaChange()
1395	default:
1396		panic(fmt.Errorf("gocql: unknown event type: %q", eventType))
1397	}
1398
1399}
1400
1401type writeAuthResponseFrame struct {
1402	data []byte
1403}
1404
1405func (a *writeAuthResponseFrame) String() string {
1406	return fmt.Sprintf("[auth_response data=%q]", a.data)
1407}
1408
1409func (a *writeAuthResponseFrame) writeFrame(framer *framer, streamID int) error {
1410	return framer.writeAuthResponseFrame(streamID, a.data)
1411}
1412
1413func (f *framer) writeAuthResponseFrame(streamID int, data []byte) error {
1414	f.writeHeader(f.flags, opAuthResponse, streamID)
1415	f.writeBytes(data)
1416	return f.finishWrite()
1417}
1418
1419type queryValues struct {
1420	value []byte
1421
1422	// optional name, will set With names for values flag
1423	name    string
1424	isUnset bool
1425}
1426
1427type queryParams struct {
1428	consistency Consistency
1429	// v2+
1430	skipMeta          bool
1431	values            []queryValues
1432	pageSize          int
1433	pagingState       []byte
1434	serialConsistency SerialConsistency
1435	// v3+
1436	defaultTimestamp      bool
1437	defaultTimestampValue int64
1438	// v5+
1439	keyspace string
1440}
1441
1442func (q queryParams) String() string {
1443	return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v default_timestamp=%v values=%v keyspace=%s]",
1444		q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.defaultTimestamp, q.values, q.keyspace)
1445}
1446
1447func (f *framer) writeQueryParams(opts *queryParams) {
1448	f.writeConsistency(opts.consistency)
1449
1450	if f.proto == protoVersion1 {
1451		return
1452	}
1453
1454	var flags byte
1455	if len(opts.values) > 0 {
1456		flags |= flagValues
1457	}
1458	if opts.skipMeta {
1459		flags |= flagSkipMetaData
1460	}
1461	if opts.pageSize > 0 {
1462		flags |= flagPageSize
1463	}
1464	if len(opts.pagingState) > 0 {
1465		flags |= flagWithPagingState
1466	}
1467	if opts.serialConsistency > 0 {
1468		flags |= flagWithSerialConsistency
1469	}
1470
1471	names := false
1472
1473	// protoV3 specific things
1474	if f.proto > protoVersion2 {
1475		if opts.defaultTimestamp {
1476			flags |= flagDefaultTimestamp
1477		}
1478
1479		if len(opts.values) > 0 && opts.values[0].name != "" {
1480			flags |= flagWithNameValues
1481			names = true
1482		}
1483	}
1484
1485	if opts.keyspace != "" {
1486		if f.proto > protoVersion4 {
1487			flags |= flagWithKeyspace
1488		} else {
1489			panic(fmt.Errorf("the keyspace can only be set with protocol 5 or higher"))
1490		}
1491	}
1492
1493	if f.proto > protoVersion4 {
1494		f.writeUint(uint32(flags))
1495	} else {
1496		f.writeByte(flags)
1497	}
1498
1499	if n := len(opts.values); n > 0 {
1500		f.writeShort(uint16(n))
1501
1502		for i := 0; i < n; i++ {
1503			if names {
1504				f.writeString(opts.values[i].name)
1505			}
1506			if opts.values[i].isUnset {
1507				f.writeUnset()
1508			} else {
1509				f.writeBytes(opts.values[i].value)
1510			}
1511		}
1512	}
1513
1514	if opts.pageSize > 0 {
1515		f.writeInt(int32(opts.pageSize))
1516	}
1517
1518	if len(opts.pagingState) > 0 {
1519		f.writeBytes(opts.pagingState)
1520	}
1521
1522	if opts.serialConsistency > 0 {
1523		f.writeConsistency(Consistency(opts.serialConsistency))
1524	}
1525
1526	if f.proto > protoVersion2 && opts.defaultTimestamp {
1527		// timestamp in microseconds
1528		var ts int64
1529		if opts.defaultTimestampValue != 0 {
1530			ts = opts.defaultTimestampValue
1531		} else {
1532			ts = time.Now().UnixNano() / 1000
1533		}
1534		f.writeLong(ts)
1535	}
1536
1537	if opts.keyspace != "" {
1538		f.writeString(opts.keyspace)
1539	}
1540}
1541
1542type writeQueryFrame struct {
1543	statement string
1544	params    queryParams
1545
1546	// v4+
1547	customPayload map[string][]byte
1548}
1549
1550func (w *writeQueryFrame) String() string {
1551	return fmt.Sprintf("[query statement=%q params=%v]", w.statement, w.params)
1552}
1553
1554func (w *writeQueryFrame) writeFrame(framer *framer, streamID int) error {
1555	return framer.writeQueryFrame(streamID, w.statement, &w.params, w.customPayload)
1556}
1557
1558func (f *framer) writeQueryFrame(streamID int, statement string, params *queryParams, customPayload map[string][]byte) error {
1559	if len(customPayload) > 0 {
1560		f.payload()
1561	}
1562	f.writeHeader(f.flags, opQuery, streamID)
1563	f.writeCustomPayload(&customPayload)
1564	f.writeLongString(statement)
1565	f.writeQueryParams(params)
1566
1567	return f.finishWrite()
1568}
1569
1570type frameWriter interface {
1571	writeFrame(framer *framer, streamID int) error
1572}
1573
1574type frameWriterFunc func(framer *framer, streamID int) error
1575
1576func (f frameWriterFunc) writeFrame(framer *framer, streamID int) error {
1577	return f(framer, streamID)
1578}
1579
1580type writeExecuteFrame struct {
1581	preparedID []byte
1582	params     queryParams
1583
1584	// v4+
1585	customPayload map[string][]byte
1586}
1587
1588func (e *writeExecuteFrame) String() string {
1589	return fmt.Sprintf("[execute id=% X params=%v]", e.preparedID, &e.params)
1590}
1591
1592func (e *writeExecuteFrame) writeFrame(fr *framer, streamID int) error {
1593	return fr.writeExecuteFrame(streamID, e.preparedID, &e.params, &e.customPayload)
1594}
1595
1596func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *queryParams, customPayload *map[string][]byte) error {
1597	if len(*customPayload) > 0 {
1598		f.payload()
1599	}
1600	f.writeHeader(f.flags, opExecute, streamID)
1601	f.writeCustomPayload(customPayload)
1602	f.writeShortBytes(preparedID)
1603	if f.proto > protoVersion1 {
1604		f.writeQueryParams(params)
1605	} else {
1606		n := len(params.values)
1607		f.writeShort(uint16(n))
1608		for i := 0; i < n; i++ {
1609			if params.values[i].isUnset {
1610				f.writeUnset()
1611			} else {
1612				f.writeBytes(params.values[i].value)
1613			}
1614		}
1615		f.writeConsistency(params.consistency)
1616	}
1617
1618	return f.finishWrite()
1619}
1620
1621// TODO: can we replace BatchStatemt with batchStatement? As they prety much
1622// duplicate each other
1623type batchStatment struct {
1624	preparedID []byte
1625	statement  string
1626	values     []queryValues
1627}
1628
1629type writeBatchFrame struct {
1630	typ         BatchType
1631	statements  []batchStatment
1632	consistency Consistency
1633
1634	// v3+
1635	serialConsistency     SerialConsistency
1636	defaultTimestamp      bool
1637	defaultTimestampValue int64
1638
1639	//v4+
1640	customPayload map[string][]byte
1641}
1642
1643func (w *writeBatchFrame) writeFrame(framer *framer, streamID int) error {
1644	return framer.writeBatchFrame(streamID, w, w.customPayload)
1645}
1646
1647func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload map[string][]byte) error {
1648	if len(customPayload) > 0 {
1649		f.payload()
1650	}
1651	f.writeHeader(f.flags, opBatch, streamID)
1652	f.writeCustomPayload(&customPayload)
1653	f.writeByte(byte(w.typ))
1654
1655	n := len(w.statements)
1656	f.writeShort(uint16(n))
1657
1658	var flags byte
1659
1660	for i := 0; i < n; i++ {
1661		b := &w.statements[i]
1662		if len(b.preparedID) == 0 {
1663			f.writeByte(0)
1664			f.writeLongString(b.statement)
1665		} else {
1666			f.writeByte(1)
1667			f.writeShortBytes(b.preparedID)
1668		}
1669
1670		f.writeShort(uint16(len(b.values)))
1671		for j := range b.values {
1672			col := b.values[j]
1673			if f.proto > protoVersion2 && col.name != "" {
1674				// TODO: move this check into the caller and set a flag on writeBatchFrame
1675				// to indicate using named values
1676				if f.proto <= protoVersion5 {
1677					return fmt.Errorf("gocql: named query values are not supported in batches, please see https://issues.apache.org/jira/browse/CASSANDRA-10246")
1678				}
1679				flags |= flagWithNameValues
1680				f.writeString(col.name)
1681			}
1682			if col.isUnset {
1683				f.writeUnset()
1684			} else {
1685				f.writeBytes(col.value)
1686			}
1687		}
1688	}
1689
1690	f.writeConsistency(w.consistency)
1691
1692	if f.proto > protoVersion2 {
1693		if w.serialConsistency > 0 {
1694			flags |= flagWithSerialConsistency
1695		}
1696		if w.defaultTimestamp {
1697			flags |= flagDefaultTimestamp
1698		}
1699
1700		if f.proto > protoVersion4 {
1701			f.writeUint(uint32(flags))
1702		} else {
1703			f.writeByte(flags)
1704		}
1705
1706		if w.serialConsistency > 0 {
1707			f.writeConsistency(Consistency(w.serialConsistency))
1708		}
1709
1710		if w.defaultTimestamp {
1711			var ts int64
1712			if w.defaultTimestampValue != 0 {
1713				ts = w.defaultTimestampValue
1714			} else {
1715				ts = time.Now().UnixNano() / 1000
1716			}
1717			f.writeLong(ts)
1718		}
1719	}
1720
1721	return f.finishWrite()
1722}
1723
1724type writeOptionsFrame struct{}
1725
1726func (w *writeOptionsFrame) writeFrame(framer *framer, streamID int) error {
1727	return framer.writeOptionsFrame(streamID, w)
1728}
1729
1730func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) error {
1731	f.writeHeader(f.flags&^flagCompress, opOptions, stream)
1732	return f.finishWrite()
1733}
1734
1735type writeRegisterFrame struct {
1736	events []string
1737}
1738
1739func (w *writeRegisterFrame) writeFrame(framer *framer, streamID int) error {
1740	return framer.writeRegisterFrame(streamID, w)
1741}
1742
1743func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) error {
1744	f.writeHeader(f.flags, opRegister, streamID)
1745	f.writeStringList(w.events)
1746
1747	return f.finishWrite()
1748}
1749
1750func (f *framer) readByte() byte {
1751	if len(f.rbuf) < 1 {
1752		panic(fmt.Errorf("not enough bytes in buffer to read byte require 1 got: %d", len(f.rbuf)))
1753	}
1754
1755	b := f.rbuf[0]
1756	f.rbuf = f.rbuf[1:]
1757	return b
1758}
1759
1760func (f *framer) readInt() (n int) {
1761	if len(f.rbuf) < 4 {
1762		panic(fmt.Errorf("not enough bytes in buffer to read int require 4 got: %d", len(f.rbuf)))
1763	}
1764
1765	n = int(int32(f.rbuf[0])<<24 | int32(f.rbuf[1])<<16 | int32(f.rbuf[2])<<8 | int32(f.rbuf[3]))
1766	f.rbuf = f.rbuf[4:]
1767	return
1768}
1769
1770func (f *framer) readShort() (n uint16) {
1771	if len(f.rbuf) < 2 {
1772		panic(fmt.Errorf("not enough bytes in buffer to read short require 2 got: %d", len(f.rbuf)))
1773	}
1774	n = uint16(f.rbuf[0])<<8 | uint16(f.rbuf[1])
1775	f.rbuf = f.rbuf[2:]
1776	return
1777}
1778
1779func (f *framer) readString() (s string) {
1780	size := f.readShort()
1781
1782	if len(f.rbuf) < int(size) {
1783		panic(fmt.Errorf("not enough bytes in buffer to read string require %d got: %d", size, len(f.rbuf)))
1784	}
1785
1786	s = string(f.rbuf[:size])
1787	f.rbuf = f.rbuf[size:]
1788	return
1789}
1790
1791func (f *framer) readLongString() (s string) {
1792	size := f.readInt()
1793
1794	if len(f.rbuf) < size {
1795		panic(fmt.Errorf("not enough bytes in buffer to read long string require %d got: %d", size, len(f.rbuf)))
1796	}
1797
1798	s = string(f.rbuf[:size])
1799	f.rbuf = f.rbuf[size:]
1800	return
1801}
1802
1803func (f *framer) readUUID() *UUID {
1804	if len(f.rbuf) < 16 {
1805		panic(fmt.Errorf("not enough bytes in buffer to read uuid require %d got: %d", 16, len(f.rbuf)))
1806	}
1807
1808	// TODO: how to handle this error, if it is a uuid, then sureley, problems?
1809	u, _ := UUIDFromBytes(f.rbuf[:16])
1810	f.rbuf = f.rbuf[16:]
1811	return &u
1812}
1813
1814func (f *framer) readStringList() []string {
1815	size := f.readShort()
1816
1817	l := make([]string, size)
1818	for i := 0; i < int(size); i++ {
1819		l[i] = f.readString()
1820	}
1821
1822	return l
1823}
1824
1825func (f *framer) readBytesInternal() ([]byte, error) {
1826	size := f.readInt()
1827	if size < 0 {
1828		return nil, nil
1829	}
1830
1831	if len(f.rbuf) < size {
1832		return nil, fmt.Errorf("not enough bytes in buffer to read bytes require %d got: %d", size, len(f.rbuf))
1833	}
1834
1835	l := f.rbuf[:size]
1836	f.rbuf = f.rbuf[size:]
1837
1838	return l, nil
1839}
1840
1841func (f *framer) readBytes() []byte {
1842	l, err := f.readBytesInternal()
1843	if err != nil {
1844		panic(err)
1845	}
1846
1847	return l
1848}
1849
1850func (f *framer) readShortBytes() []byte {
1851	size := f.readShort()
1852	if len(f.rbuf) < int(size) {
1853		panic(fmt.Errorf("not enough bytes in buffer to read short bytes: require %d got %d", size, len(f.rbuf)))
1854	}
1855
1856	l := f.rbuf[:size]
1857	f.rbuf = f.rbuf[size:]
1858
1859	return l
1860}
1861
1862func (f *framer) readInetAdressOnly() net.IP {
1863	if len(f.rbuf) < 1 {
1864		panic(fmt.Errorf("not enough bytes in buffer to read inet size require %d got: %d", 1, len(f.rbuf)))
1865	}
1866
1867	size := f.rbuf[0]
1868	f.rbuf = f.rbuf[1:]
1869
1870	if !(size == 4 || size == 16) {
1871		panic(fmt.Errorf("invalid IP size: %d", size))
1872	}
1873
1874	if len(f.rbuf) < 1 {
1875		panic(fmt.Errorf("not enough bytes in buffer to read inet require %d got: %d", size, len(f.rbuf)))
1876	}
1877
1878	ip := make([]byte, size)
1879	copy(ip, f.rbuf[:size])
1880	f.rbuf = f.rbuf[size:]
1881	return net.IP(ip)
1882}
1883
1884func (f *framer) readInet() (net.IP, int) {
1885	return f.readInetAdressOnly(), f.readInt()
1886}
1887
1888func (f *framer) readConsistency() Consistency {
1889	return Consistency(f.readShort())
1890}
1891
1892func (f *framer) readBytesMap() map[string][]byte {
1893	size := f.readShort()
1894	m := make(map[string][]byte, size)
1895
1896	for i := 0; i < int(size); i++ {
1897		k := f.readString()
1898		v := f.readBytes()
1899		m[k] = v
1900	}
1901
1902	return m
1903}
1904
1905func (f *framer) readStringMultiMap() map[string][]string {
1906	size := f.readShort()
1907	m := make(map[string][]string, size)
1908
1909	for i := 0; i < int(size); i++ {
1910		k := f.readString()
1911		v := f.readStringList()
1912		m[k] = v
1913	}
1914
1915	return m
1916}
1917
1918func (f *framer) writeByte(b byte) {
1919	f.wbuf = append(f.wbuf, b)
1920}
1921
1922func appendBytes(p []byte, d []byte) []byte {
1923	if d == nil {
1924		return appendInt(p, -1)
1925	}
1926	p = appendInt(p, int32(len(d)))
1927	p = append(p, d...)
1928	return p
1929}
1930
1931func appendShort(p []byte, n uint16) []byte {
1932	return append(p,
1933		byte(n>>8),
1934		byte(n),
1935	)
1936}
1937
1938func appendInt(p []byte, n int32) []byte {
1939	return append(p, byte(n>>24),
1940		byte(n>>16),
1941		byte(n>>8),
1942		byte(n))
1943}
1944
1945func appendUint(p []byte, n uint32) []byte {
1946	return append(p, byte(n>>24),
1947		byte(n>>16),
1948		byte(n>>8),
1949		byte(n))
1950}
1951
1952func appendLong(p []byte, n int64) []byte {
1953	return append(p,
1954		byte(n>>56),
1955		byte(n>>48),
1956		byte(n>>40),
1957		byte(n>>32),
1958		byte(n>>24),
1959		byte(n>>16),
1960		byte(n>>8),
1961		byte(n),
1962	)
1963}
1964
1965func (f *framer) writeCustomPayload(customPayload *map[string][]byte) {
1966	if len(*customPayload) > 0 {
1967		if f.proto < protoVersion4 {
1968			panic("Custom payload is not supported with version V3 or less")
1969		}
1970		f.writeBytesMap(*customPayload)
1971	}
1972}
1973
1974// these are protocol level binary types
1975func (f *framer) writeInt(n int32) {
1976	f.wbuf = appendInt(f.wbuf, n)
1977}
1978
1979func (f *framer) writeUint(n uint32) {
1980	f.wbuf = appendUint(f.wbuf, n)
1981}
1982
1983func (f *framer) writeShort(n uint16) {
1984	f.wbuf = appendShort(f.wbuf, n)
1985}
1986
1987func (f *framer) writeLong(n int64) {
1988	f.wbuf = appendLong(f.wbuf, n)
1989}
1990
1991func (f *framer) writeString(s string) {
1992	f.writeShort(uint16(len(s)))
1993	f.wbuf = append(f.wbuf, s...)
1994}
1995
1996func (f *framer) writeLongString(s string) {
1997	f.writeInt(int32(len(s)))
1998	f.wbuf = append(f.wbuf, s...)
1999}
2000
2001func (f *framer) writeStringList(l []string) {
2002	f.writeShort(uint16(len(l)))
2003	for _, s := range l {
2004		f.writeString(s)
2005	}
2006}
2007
2008func (f *framer) writeUnset() {
2009	// Protocol version 4 specifies that bind variables do not require having a
2010	// value when executing a statement.   Bind variables without a value are
2011	// called 'unset'. The 'unset' bind variable is serialized as the int
2012	// value '-2' without following bytes.
2013	f.writeInt(-2)
2014}
2015
2016func (f *framer) writeBytes(p []byte) {
2017	// TODO: handle null case correctly,
2018	//     [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
2019	//					  no byte should follow and the value represented is `null`.
2020	if p == nil {
2021		f.writeInt(-1)
2022	} else {
2023		f.writeInt(int32(len(p)))
2024		f.wbuf = append(f.wbuf, p...)
2025	}
2026}
2027
2028func (f *framer) writeShortBytes(p []byte) {
2029	f.writeShort(uint16(len(p)))
2030	f.wbuf = append(f.wbuf, p...)
2031}
2032
2033func (f *framer) writeConsistency(cons Consistency) {
2034	f.writeShort(uint16(cons))
2035}
2036
2037func (f *framer) writeStringMap(m map[string]string) {
2038	f.writeShort(uint16(len(m)))
2039	for k, v := range m {
2040		f.writeString(k)
2041		f.writeString(v)
2042	}
2043}
2044
2045func (f *framer) writeBytesMap(m map[string][]byte) {
2046	f.writeShort(uint16(len(m)))
2047	for k, v := range m {
2048		f.writeString(k)
2049		f.writeBytes(v)
2050	}
2051}
2052