1// Copyright (c) 2012 The gocql Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package gocql
6
7import (
8	"context"
9	"errors"
10	"fmt"
11	"io"
12	"io/ioutil"
13	"net"
14	"runtime"
15	"strings"
16	"time"
17)
18
19type unsetColumn struct{}
20
21// UnsetValue represents a value used in a query binding that will be ignored by Cassandra.
22//
23// By setting a field to the unset value Cassandra will ignore the write completely.
24// The main advantage is the ability to keep the same prepared statement even when you don't
25// want to update some fields, where before you needed to make another prepared statement.
26//
27// UnsetValue is only available when using the version 4 of the protocol.
28var UnsetValue = unsetColumn{}
29
30type namedValue struct {
31	name  string
32	value interface{}
33}
34
35// NamedValue produce a value which will bind to the named parameter in a query
36func NamedValue(name string, value interface{}) interface{} {
37	return &namedValue{
38		name:  name,
39		value: value,
40	}
41}
42
43const (
44	protoDirectionMask = 0x80
45	protoVersionMask   = 0x7F
46	protoVersion1      = 0x01
47	protoVersion2      = 0x02
48	protoVersion3      = 0x03
49	protoVersion4      = 0x04
50	protoVersion5      = 0x05
51
52	maxFrameSize = 256 * 1024 * 1024
53)
54
55type protoVersion byte
56
57func (p protoVersion) request() bool {
58	return p&protoDirectionMask == 0x00
59}
60
61func (p protoVersion) response() bool {
62	return p&protoDirectionMask == 0x80
63}
64
65func (p protoVersion) version() byte {
66	return byte(p) & protoVersionMask
67}
68
69func (p protoVersion) String() string {
70	dir := "REQ"
71	if p.response() {
72		dir = "RESP"
73	}
74
75	return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir)
76}
77
78type frameOp byte
79
80const (
81	// header ops
82	opError         frameOp = 0x00
83	opStartup       frameOp = 0x01
84	opReady         frameOp = 0x02
85	opAuthenticate  frameOp = 0x03
86	opOptions       frameOp = 0x05
87	opSupported     frameOp = 0x06
88	opQuery         frameOp = 0x07
89	opResult        frameOp = 0x08
90	opPrepare       frameOp = 0x09
91	opExecute       frameOp = 0x0A
92	opRegister      frameOp = 0x0B
93	opEvent         frameOp = 0x0C
94	opBatch         frameOp = 0x0D
95	opAuthChallenge frameOp = 0x0E
96	opAuthResponse  frameOp = 0x0F
97	opAuthSuccess   frameOp = 0x10
98)
99
100func (f frameOp) String() string {
101	switch f {
102	case opError:
103		return "ERROR"
104	case opStartup:
105		return "STARTUP"
106	case opReady:
107		return "READY"
108	case opAuthenticate:
109		return "AUTHENTICATE"
110	case opOptions:
111		return "OPTIONS"
112	case opSupported:
113		return "SUPPORTED"
114	case opQuery:
115		return "QUERY"
116	case opResult:
117		return "RESULT"
118	case opPrepare:
119		return "PREPARE"
120	case opExecute:
121		return "EXECUTE"
122	case opRegister:
123		return "REGISTER"
124	case opEvent:
125		return "EVENT"
126	case opBatch:
127		return "BATCH"
128	case opAuthChallenge:
129		return "AUTH_CHALLENGE"
130	case opAuthResponse:
131		return "AUTH_RESPONSE"
132	case opAuthSuccess:
133		return "AUTH_SUCCESS"
134	default:
135		return fmt.Sprintf("UNKNOWN_OP_%d", f)
136	}
137}
138
139const (
140	// result kind
141	resultKindVoid          = 1
142	resultKindRows          = 2
143	resultKindKeyspace      = 3
144	resultKindPrepared      = 4
145	resultKindSchemaChanged = 5
146
147	// rows flags
148	flagGlobalTableSpec int = 0x01
149	flagHasMorePages    int = 0x02
150	flagNoMetaData      int = 0x04
151
152	// query flags
153	flagValues                byte = 0x01
154	flagSkipMetaData          byte = 0x02
155	flagPageSize              byte = 0x04
156	flagWithPagingState       byte = 0x08
157	flagWithSerialConsistency byte = 0x10
158	flagDefaultTimestamp      byte = 0x20
159	flagWithNameValues        byte = 0x40
160	flagWithKeyspace          byte = 0x80
161
162	// prepare flags
163	flagWithPreparedKeyspace uint32 = 0x01
164
165	// header flags
166	flagCompress      byte = 0x01
167	flagTracing       byte = 0x02
168	flagCustomPayload byte = 0x04
169	flagWarning       byte = 0x08
170	flagBetaProtocol  byte = 0x10
171)
172
173type Consistency uint16
174
175const (
176	Any         Consistency = 0x00
177	One         Consistency = 0x01
178	Two         Consistency = 0x02
179	Three       Consistency = 0x03
180	Quorum      Consistency = 0x04
181	All         Consistency = 0x05
182	LocalQuorum Consistency = 0x06
183	EachQuorum  Consistency = 0x07
184	LocalOne    Consistency = 0x0A
185)
186
187func (c Consistency) String() string {
188	switch c {
189	case Any:
190		return "ANY"
191	case One:
192		return "ONE"
193	case Two:
194		return "TWO"
195	case Three:
196		return "THREE"
197	case Quorum:
198		return "QUORUM"
199	case All:
200		return "ALL"
201	case LocalQuorum:
202		return "LOCAL_QUORUM"
203	case EachQuorum:
204		return "EACH_QUORUM"
205	case LocalOne:
206		return "LOCAL_ONE"
207	default:
208		return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
209	}
210}
211
212func (c Consistency) MarshalText() (text []byte, err error) {
213	return []byte(c.String()), nil
214}
215
216func (c *Consistency) UnmarshalText(text []byte) error {
217	switch string(text) {
218	case "ANY":
219		*c = Any
220	case "ONE":
221		*c = One
222	case "TWO":
223		*c = Two
224	case "THREE":
225		*c = Three
226	case "QUORUM":
227		*c = Quorum
228	case "ALL":
229		*c = All
230	case "LOCAL_QUORUM":
231		*c = LocalQuorum
232	case "EACH_QUORUM":
233		*c = EachQuorum
234	case "LOCAL_ONE":
235		*c = LocalOne
236	default:
237		return fmt.Errorf("invalid consistency %q", string(text))
238	}
239
240	return nil
241}
242
243func ParseConsistency(s string) Consistency {
244	var c Consistency
245	if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil {
246		panic(err)
247	}
248	return c
249}
250
251// ParseConsistencyWrapper wraps gocql.ParseConsistency to provide an err
252// return instead of a panic
253func ParseConsistencyWrapper(s string) (consistency Consistency, err error) {
254	err = consistency.UnmarshalText([]byte(strings.ToUpper(s)))
255	return
256}
257
258// MustParseConsistency is the same as ParseConsistency except it returns
259// an error (never). It is kept here since breaking changes are not good.
260// DEPRECATED: use ParseConsistency if you want a panic on parse error.
261func MustParseConsistency(s string) (Consistency, error) {
262	c, err := ParseConsistencyWrapper(s)
263	if err != nil {
264		panic(err)
265	}
266	return c, nil
267}
268
269type SerialConsistency uint16
270
271const (
272	Serial      SerialConsistency = 0x08
273	LocalSerial SerialConsistency = 0x09
274)
275
276func (s SerialConsistency) String() string {
277	switch s {
278	case Serial:
279		return "SERIAL"
280	case LocalSerial:
281		return "LOCAL_SERIAL"
282	default:
283		return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s))
284	}
285}
286
287func (s SerialConsistency) MarshalText() (text []byte, err error) {
288	return []byte(s.String()), nil
289}
290
291func (s *SerialConsistency) UnmarshalText(text []byte) error {
292	switch string(text) {
293	case "SERIAL":
294		*s = Serial
295	case "LOCAL_SERIAL":
296		*s = LocalSerial
297	default:
298		return fmt.Errorf("invalid consistency %q", string(text))
299	}
300
301	return nil
302}
303
304const (
305	apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
306)
307
308var (
309	ErrFrameTooBig = errors.New("frame length is bigger than the maximum allowed")
310)
311
312const maxFrameHeaderSize = 9
313
314func writeInt(p []byte, n int32) {
315	p[0] = byte(n >> 24)
316	p[1] = byte(n >> 16)
317	p[2] = byte(n >> 8)
318	p[3] = byte(n)
319}
320
321func readInt(p []byte) int32 {
322	return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3])
323}
324
325func writeShort(p []byte, n uint16) {
326	p[0] = byte(n >> 8)
327	p[1] = byte(n)
328}
329
330func readShort(p []byte) uint16 {
331	return uint16(p[0])<<8 | uint16(p[1])
332}
333
334type frameHeader struct {
335	version  protoVersion
336	flags    byte
337	stream   int
338	op       frameOp
339	length   int
340	warnings []string
341}
342
343func (f frameHeader) String() string {
344	return fmt.Sprintf("[header version=%s flags=0x%x stream=%d op=%s length=%d]", f.version, f.flags, f.stream, f.op, f.length)
345}
346
347func (f frameHeader) Header() frameHeader {
348	return f
349}
350
351const defaultBufSize = 128
352
353type ObservedFrameHeader struct {
354	Version protoVersion
355	Flags   byte
356	Stream  int16
357	Opcode  frameOp
358	Length  int32
359
360	// StartHeader is the time we started reading the frame header off the network connection.
361	Start time.Time
362	// EndHeader is the time we finished reading the frame header off the network connection.
363	End time.Time
364}
365
366func (f ObservedFrameHeader) String() string {
367	return fmt.Sprintf("[observed header version=%s flags=0x%x stream=%d op=%s length=%d]", f.Version, f.Flags, f.Stream, f.Opcode, f.Length)
368}
369
370// FrameHeaderObserver is the interface implemented by frame observers / stat collectors.
371//
372// Experimental, this interface and use may change
373type FrameHeaderObserver interface {
374	// ObserveFrameHeader gets called on every received frame header.
375	ObserveFrameHeader(context.Context, ObservedFrameHeader)
376}
377
378// a framer is responsible for reading, writing and parsing frames on a single stream
379type framer struct {
380	r io.Reader
381	w io.Writer
382
383	proto byte
384	// flags are for outgoing flags, enabling compression and tracing etc
385	flags    byte
386	compres  Compressor
387	headSize int
388	// if this frame was read then the header will be here
389	header *frameHeader
390
391	// if tracing flag is set this is not nil
392	traceID []byte
393
394	// holds a ref to the whole byte slice for rbuf so that it can be reset to
395	// 0 after a read.
396	readBuffer []byte
397
398	rbuf []byte
399	wbuf []byte
400
401	customPayload map[string][]byte
402}
403
404func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *framer {
405	f := &framer{
406		wbuf:       make([]byte, defaultBufSize),
407		readBuffer: make([]byte, defaultBufSize),
408	}
409	var flags byte
410	if compressor != nil {
411		flags |= flagCompress
412	}
413	if version == protoVersion5 {
414		flags |= flagBetaProtocol
415	}
416
417	version &= protoVersionMask
418
419	headSize := 8
420	if version > protoVersion2 {
421		headSize = 9
422	}
423
424	f.compres = compressor
425	f.proto = version
426	f.flags = flags
427	f.headSize = headSize
428
429	f.r = r
430	f.rbuf = f.readBuffer[:0]
431
432	f.w = w
433	f.wbuf = f.wbuf[:0]
434
435	f.header = nil
436	f.traceID = nil
437
438	return f
439}
440
441type frame interface {
442	Header() frameHeader
443}
444
445func readHeader(r io.Reader, p []byte) (head frameHeader, err error) {
446	_, err = io.ReadFull(r, p[:1])
447	if err != nil {
448		return frameHeader{}, err
449	}
450
451	version := p[0] & protoVersionMask
452
453	if version < protoVersion1 || version > protoVersion5 {
454		return frameHeader{}, fmt.Errorf("gocql: unsupported protocol response version: %d", version)
455	}
456
457	headSize := 9
458	if version < protoVersion3 {
459		headSize = 8
460	}
461
462	_, err = io.ReadFull(r, p[1:headSize])
463	if err != nil {
464		return frameHeader{}, err
465	}
466
467	p = p[:headSize]
468
469	head.version = protoVersion(p[0])
470	head.flags = p[1]
471
472	if version > protoVersion2 {
473		if len(p) != 9 {
474			return frameHeader{}, fmt.Errorf("not enough bytes to read header require 9 got: %d", len(p))
475		}
476
477		head.stream = int(int16(p[2])<<8 | int16(p[3]))
478		head.op = frameOp(p[4])
479		head.length = int(readInt(p[5:]))
480	} else {
481		if len(p) != 8 {
482			return frameHeader{}, fmt.Errorf("not enough bytes to read header require 8 got: %d", len(p))
483		}
484
485		head.stream = int(int8(p[2]))
486		head.op = frameOp(p[3])
487		head.length = int(readInt(p[4:]))
488	}
489
490	return head, nil
491}
492
493// explicitly enables tracing for the framers outgoing requests
494func (f *framer) trace() {
495	f.flags |= flagTracing
496}
497
498// explicitly enables the custom payload flag
499func (f *framer) payload() {
500	f.flags |= flagCustomPayload
501}
502
503// reads a frame form the wire into the framers buffer
504func (f *framer) readFrame(head *frameHeader) error {
505	if head.length < 0 {
506		return fmt.Errorf("frame body length can not be less than 0: %d", head.length)
507	} else if head.length > maxFrameSize {
508		// need to free up the connection to be used again
509		_, err := io.CopyN(ioutil.Discard, f.r, int64(head.length))
510		if err != nil {
511			return fmt.Errorf("error whilst trying to discard frame with invalid length: %v", err)
512		}
513		return ErrFrameTooBig
514	}
515
516	if cap(f.readBuffer) >= head.length {
517		f.rbuf = f.readBuffer[:head.length]
518	} else {
519		f.readBuffer = make([]byte, head.length)
520		f.rbuf = f.readBuffer
521	}
522
523	// assume the underlying reader takes care of timeouts and retries
524	n, err := io.ReadFull(f.r, f.rbuf)
525	if err != nil {
526		return fmt.Errorf("unable to read frame body: read %d/%d bytes: %v", n, head.length, err)
527	}
528
529	if head.flags&flagCompress == flagCompress {
530		if f.compres == nil {
531			return NewErrProtocol("no compressor available with compressed frame body")
532		}
533
534		f.rbuf, err = f.compres.Decode(f.rbuf)
535		if err != nil {
536			return err
537		}
538	}
539
540	f.header = head
541	return nil
542}
543
544func (f *framer) parseFrame() (frame frame, err error) {
545	defer func() {
546		if r := recover(); r != nil {
547			if _, ok := r.(runtime.Error); ok {
548				panic(r)
549			}
550			err = r.(error)
551		}
552	}()
553
554	if f.header.version.request() {
555		return nil, NewErrProtocol("got a request frame from server: %v", f.header.version)
556	}
557
558	if f.header.flags&flagTracing == flagTracing {
559		f.readTrace()
560	}
561
562	if f.header.flags&flagWarning == flagWarning {
563		f.header.warnings = f.readStringList()
564	}
565
566	if f.header.flags&flagCustomPayload == flagCustomPayload {
567		f.customPayload = f.readBytesMap()
568	}
569
570	// assumes that the frame body has been read into rbuf
571	switch f.header.op {
572	case opError:
573		frame = f.parseErrorFrame()
574	case opReady:
575		frame = f.parseReadyFrame()
576	case opResult:
577		frame, err = f.parseResultFrame()
578	case opSupported:
579		frame = f.parseSupportedFrame()
580	case opAuthenticate:
581		frame = f.parseAuthenticateFrame()
582	case opAuthChallenge:
583		frame = f.parseAuthChallengeFrame()
584	case opAuthSuccess:
585		frame = f.parseAuthSuccessFrame()
586	case opEvent:
587		frame = f.parseEventFrame()
588	default:
589		return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
590	}
591
592	return
593}
594
595func (f *framer) parseErrorFrame() frame {
596	code := f.readInt()
597	msg := f.readString()
598
599	errD := errorFrame{
600		frameHeader: *f.header,
601		code:        code,
602		message:     msg,
603	}
604
605	switch code {
606	case errUnavailable:
607		cl := f.readConsistency()
608		required := f.readInt()
609		alive := f.readInt()
610		return &RequestErrUnavailable{
611			errorFrame:  errD,
612			Consistency: cl,
613			Required:    required,
614			Alive:       alive,
615		}
616	case errWriteTimeout:
617		cl := f.readConsistency()
618		received := f.readInt()
619		blockfor := f.readInt()
620		writeType := f.readString()
621		return &RequestErrWriteTimeout{
622			errorFrame:  errD,
623			Consistency: cl,
624			Received:    received,
625			BlockFor:    blockfor,
626			WriteType:   writeType,
627		}
628	case errReadTimeout:
629		cl := f.readConsistency()
630		received := f.readInt()
631		blockfor := f.readInt()
632		dataPresent := f.readByte()
633		return &RequestErrReadTimeout{
634			errorFrame:  errD,
635			Consistency: cl,
636			Received:    received,
637			BlockFor:    blockfor,
638			DataPresent: dataPresent,
639		}
640	case errAlreadyExists:
641		ks := f.readString()
642		table := f.readString()
643		return &RequestErrAlreadyExists{
644			errorFrame: errD,
645			Keyspace:   ks,
646			Table:      table,
647		}
648	case errUnprepared:
649		stmtId := f.readShortBytes()
650		return &RequestErrUnprepared{
651			errorFrame:  errD,
652			StatementId: copyBytes(stmtId), // defensively copy
653		}
654	case errReadFailure:
655		res := &RequestErrReadFailure{
656			errorFrame: errD,
657		}
658		res.Consistency = f.readConsistency()
659		res.Received = f.readInt()
660		res.BlockFor = f.readInt()
661		if f.proto > protoVersion4 {
662			res.ErrorMap = f.readErrorMap()
663			res.NumFailures = len(res.ErrorMap)
664		} else {
665			res.NumFailures = f.readInt()
666		}
667		res.DataPresent = f.readByte() != 0
668
669		return res
670	case errWriteFailure:
671		res := &RequestErrWriteFailure{
672			errorFrame: errD,
673		}
674		res.Consistency = f.readConsistency()
675		res.Received = f.readInt()
676		res.BlockFor = f.readInt()
677		if f.proto > protoVersion4 {
678			res.ErrorMap = f.readErrorMap()
679			res.NumFailures = len(res.ErrorMap)
680		} else {
681			res.NumFailures = f.readInt()
682		}
683		res.WriteType = f.readString()
684		return res
685	case errFunctionFailure:
686		res := &RequestErrFunctionFailure{
687			errorFrame: errD,
688		}
689		res.Keyspace = f.readString()
690		res.Function = f.readString()
691		res.ArgTypes = f.readStringList()
692		return res
693
694	case errCDCWriteFailure:
695		res := &RequestErrCDCWriteFailure{
696			errorFrame: errD,
697		}
698		return res
699
700	case errInvalid, errBootstrapping, errConfig, errCredentials, errOverloaded,
701		errProtocol, errServer, errSyntax, errTruncate, errUnauthorized:
702		// TODO(zariel): we should have some distinct types for these errors
703		return errD
704	default:
705		panic(fmt.Errorf("unknown error code: 0x%x", errD.code))
706	}
707}
708
709func (f *framer) readErrorMap() (errMap ErrorMap) {
710	errMap = make(ErrorMap)
711	numErrs := f.readInt()
712	for i := 0; i < numErrs; i++ {
713		ip := f.readInetAdressOnly().String()
714		errMap[ip] = f.readShort()
715	}
716	return
717}
718
719func (f *framer) writeHeader(flags byte, op frameOp, stream int) {
720	f.wbuf = f.wbuf[:0]
721	f.wbuf = append(f.wbuf,
722		f.proto,
723		flags,
724	)
725
726	if f.proto > protoVersion2 {
727		f.wbuf = append(f.wbuf,
728			byte(stream>>8),
729			byte(stream),
730		)
731	} else {
732		f.wbuf = append(f.wbuf,
733			byte(stream),
734		)
735	}
736
737	// pad out length
738	f.wbuf = append(f.wbuf,
739		byte(op),
740		0,
741		0,
742		0,
743		0,
744	)
745}
746
747func (f *framer) setLength(length int) {
748	p := 4
749	if f.proto > protoVersion2 {
750		p = 5
751	}
752
753	f.wbuf[p+0] = byte(length >> 24)
754	f.wbuf[p+1] = byte(length >> 16)
755	f.wbuf[p+2] = byte(length >> 8)
756	f.wbuf[p+3] = byte(length)
757}
758
759func (f *framer) finishWrite() error {
760	if len(f.wbuf) > maxFrameSize {
761		// huge app frame, lets remove it so it doesn't bloat the heap
762		f.wbuf = make([]byte, defaultBufSize)
763		return ErrFrameTooBig
764	}
765
766	if f.wbuf[1]&flagCompress == flagCompress {
767		if f.compres == nil {
768			panic("compress flag set with no compressor")
769		}
770
771		// TODO: only compress frames which are big enough
772		compressed, err := f.compres.Encode(f.wbuf[f.headSize:])
773		if err != nil {
774			return err
775		}
776
777		f.wbuf = append(f.wbuf[:f.headSize], compressed...)
778	}
779	length := len(f.wbuf) - f.headSize
780	f.setLength(length)
781
782	_, err := f.w.Write(f.wbuf)
783	if err != nil {
784		return err
785	}
786
787	return nil
788}
789
790func (f *framer) readTrace() {
791	f.traceID = f.readUUID().Bytes()
792}
793
794type readyFrame struct {
795	frameHeader
796}
797
798func (f *framer) parseReadyFrame() frame {
799	return &readyFrame{
800		frameHeader: *f.header,
801	}
802}
803
804type supportedFrame struct {
805	frameHeader
806
807	supported map[string][]string
808}
809
810// TODO: if we move the body buffer onto the frameHeader then we only need a single
811// framer, and can move the methods onto the header.
812func (f *framer) parseSupportedFrame() frame {
813	return &supportedFrame{
814		frameHeader: *f.header,
815
816		supported: f.readStringMultiMap(),
817	}
818}
819
820type writeStartupFrame struct {
821	opts map[string]string
822}
823
824func (w writeStartupFrame) String() string {
825	return fmt.Sprintf("[startup opts=%+v]", w.opts)
826}
827
828func (w *writeStartupFrame) writeFrame(f *framer, streamID int) error {
829	f.writeHeader(f.flags&^flagCompress, opStartup, streamID)
830	f.writeStringMap(w.opts)
831
832	return f.finishWrite()
833}
834
835type writePrepareFrame struct {
836	statement     string
837	keyspace      string
838	customPayload map[string][]byte
839}
840
841func (w *writePrepareFrame) writeFrame(f *framer, streamID int) error {
842	if len(w.customPayload) > 0 {
843		f.payload()
844	}
845	f.writeHeader(f.flags, opPrepare, streamID)
846	f.writeCustomPayload(&w.customPayload)
847	f.writeLongString(w.statement)
848
849	var flags uint32 = 0
850	if w.keyspace != "" {
851		if f.proto > protoVersion4 {
852			flags |= flagWithPreparedKeyspace
853		} else {
854			panic(fmt.Errorf("The keyspace can only be set with protocol 5 or higher"))
855		}
856	}
857	if f.proto > protoVersion4 {
858		f.writeUint(flags)
859	}
860	if w.keyspace != "" {
861		f.writeString(w.keyspace)
862	}
863
864	return f.finishWrite()
865}
866
867func (f *framer) readTypeInfo() TypeInfo {
868	// TODO: factor this out so the same code paths can be used to parse custom
869	// types and other types, as much of the logic will be duplicated.
870	id := f.readShort()
871
872	simple := NativeType{
873		proto: f.proto,
874		typ:   Type(id),
875	}
876
877	if simple.typ == TypeCustom {
878		simple.custom = f.readString()
879		if cassType := getApacheCassandraType(simple.custom); cassType != TypeCustom {
880			simple.typ = cassType
881		}
882	}
883
884	switch simple.typ {
885	case TypeTuple:
886		n := f.readShort()
887		tuple := TupleTypeInfo{
888			NativeType: simple,
889			Elems:      make([]TypeInfo, n),
890		}
891
892		for i := 0; i < int(n); i++ {
893			tuple.Elems[i] = f.readTypeInfo()
894		}
895
896		return tuple
897
898	case TypeUDT:
899		udt := UDTTypeInfo{
900			NativeType: simple,
901		}
902		udt.KeySpace = f.readString()
903		udt.Name = f.readString()
904
905		n := f.readShort()
906		udt.Elements = make([]UDTField, n)
907		for i := 0; i < int(n); i++ {
908			field := &udt.Elements[i]
909			field.Name = f.readString()
910			field.Type = f.readTypeInfo()
911		}
912
913		return udt
914	case TypeMap, TypeList, TypeSet:
915		collection := CollectionType{
916			NativeType: simple,
917		}
918
919		if simple.typ == TypeMap {
920			collection.Key = f.readTypeInfo()
921		}
922
923		collection.Elem = f.readTypeInfo()
924
925		return collection
926	}
927
928	return simple
929}
930
931type preparedMetadata struct {
932	resultMetadata
933
934	// proto v4+
935	pkeyColumns []int
936}
937
938func (r preparedMetadata) String() string {
939	return fmt.Sprintf("[prepared flags=0x%x pkey=%v paging_state=% X columns=%v col_count=%d actual_col_count=%d]", r.flags, r.pkeyColumns, r.pagingState, r.columns, r.colCount, r.actualColCount)
940}
941
942func (f *framer) parsePreparedMetadata() preparedMetadata {
943	// TODO: deduplicate this from parseMetadata
944	meta := preparedMetadata{}
945
946	meta.flags = f.readInt()
947	meta.colCount = f.readInt()
948	if meta.colCount < 0 {
949		panic(fmt.Errorf("received negative column count: %d", meta.colCount))
950	}
951	meta.actualColCount = meta.colCount
952
953	if f.proto >= protoVersion4 {
954		pkeyCount := f.readInt()
955		pkeys := make([]int, pkeyCount)
956		for i := 0; i < pkeyCount; i++ {
957			pkeys[i] = int(f.readShort())
958		}
959		meta.pkeyColumns = pkeys
960	}
961
962	if meta.flags&flagHasMorePages == flagHasMorePages {
963		meta.pagingState = copyBytes(f.readBytes())
964	}
965
966	if meta.flags&flagNoMetaData == flagNoMetaData {
967		return meta
968	}
969
970	var keyspace, table string
971	globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec
972	if globalSpec {
973		keyspace = f.readString()
974		table = f.readString()
975	}
976
977	var cols []ColumnInfo
978	if meta.colCount < 1000 {
979		// preallocate columninfo to avoid excess copying
980		cols = make([]ColumnInfo, meta.colCount)
981		for i := 0; i < meta.colCount; i++ {
982			f.readCol(&cols[i], &meta.resultMetadata, globalSpec, keyspace, table)
983		}
984	} else {
985		// use append, huge number of columns usually indicates a corrupt frame or
986		// just a huge row.
987		for i := 0; i < meta.colCount; i++ {
988			var col ColumnInfo
989			f.readCol(&col, &meta.resultMetadata, globalSpec, keyspace, table)
990			cols = append(cols, col)
991		}
992	}
993
994	meta.columns = cols
995
996	return meta
997}
998
999type resultMetadata struct {
1000	flags int
1001
1002	// only if flagPageState
1003	pagingState []byte
1004
1005	columns  []ColumnInfo
1006	colCount int
1007
1008	// this is a count of the total number of columns which can be scanned,
1009	// it is at minimum len(columns) but may be larger, for instance when a column
1010	// is a UDT or tuple.
1011	actualColCount int
1012}
1013
1014func (r *resultMetadata) morePages() bool {
1015	return r.flags&flagHasMorePages == flagHasMorePages
1016}
1017
1018func (r resultMetadata) String() string {
1019	return fmt.Sprintf("[metadata flags=0x%x paging_state=% X columns=%v]", r.flags, r.pagingState, r.columns)
1020}
1021
1022func (f *framer) readCol(col *ColumnInfo, meta *resultMetadata, globalSpec bool, keyspace, table string) {
1023	if !globalSpec {
1024		col.Keyspace = f.readString()
1025		col.Table = f.readString()
1026	} else {
1027		col.Keyspace = keyspace
1028		col.Table = table
1029	}
1030
1031	col.Name = f.readString()
1032	col.TypeInfo = f.readTypeInfo()
1033	switch v := col.TypeInfo.(type) {
1034	// maybe also UDT
1035	case TupleTypeInfo:
1036		// -1 because we already included the tuple column
1037		meta.actualColCount += len(v.Elems) - 1
1038	}
1039}
1040
1041func (f *framer) parseResultMetadata() resultMetadata {
1042	var meta resultMetadata
1043
1044	meta.flags = f.readInt()
1045	meta.colCount = f.readInt()
1046	if meta.colCount < 0 {
1047		panic(fmt.Errorf("received negative column count: %d", meta.colCount))
1048	}
1049	meta.actualColCount = meta.colCount
1050
1051	if meta.flags&flagHasMorePages == flagHasMorePages {
1052		meta.pagingState = copyBytes(f.readBytes())
1053	}
1054
1055	if meta.flags&flagNoMetaData == flagNoMetaData {
1056		return meta
1057	}
1058
1059	var keyspace, table string
1060	globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec
1061	if globalSpec {
1062		keyspace = f.readString()
1063		table = f.readString()
1064	}
1065
1066	var cols []ColumnInfo
1067	if meta.colCount < 1000 {
1068		// preallocate columninfo to avoid excess copying
1069		cols = make([]ColumnInfo, meta.colCount)
1070		for i := 0; i < meta.colCount; i++ {
1071			f.readCol(&cols[i], &meta, globalSpec, keyspace, table)
1072		}
1073
1074	} else {
1075		// use append, huge number of columns usually indicates a corrupt frame or
1076		// just a huge row.
1077		for i := 0; i < meta.colCount; i++ {
1078			var col ColumnInfo
1079			f.readCol(&col, &meta, globalSpec, keyspace, table)
1080			cols = append(cols, col)
1081		}
1082	}
1083
1084	meta.columns = cols
1085
1086	return meta
1087}
1088
1089type resultVoidFrame struct {
1090	frameHeader
1091}
1092
1093func (f *resultVoidFrame) String() string {
1094	return "[result_void]"
1095}
1096
1097func (f *framer) parseResultFrame() (frame, error) {
1098	kind := f.readInt()
1099
1100	switch kind {
1101	case resultKindVoid:
1102		return &resultVoidFrame{frameHeader: *f.header}, nil
1103	case resultKindRows:
1104		return f.parseResultRows(), nil
1105	case resultKindKeyspace:
1106		return f.parseResultSetKeyspace(), nil
1107	case resultKindPrepared:
1108		return f.parseResultPrepared(), nil
1109	case resultKindSchemaChanged:
1110		return f.parseResultSchemaChange(), nil
1111	}
1112
1113	return nil, NewErrProtocol("unknown result kind: %x", kind)
1114}
1115
1116type resultRowsFrame struct {
1117	frameHeader
1118
1119	meta resultMetadata
1120	// dont parse the rows here as we only need to do it once
1121	numRows int
1122}
1123
1124func (f *resultRowsFrame) String() string {
1125	return fmt.Sprintf("[result_rows meta=%v]", f.meta)
1126}
1127
1128func (f *framer) parseResultRows() frame {
1129	result := &resultRowsFrame{}
1130	result.meta = f.parseResultMetadata()
1131
1132	result.numRows = f.readInt()
1133	if result.numRows < 0 {
1134		panic(fmt.Errorf("invalid row_count in result frame: %d", result.numRows))
1135	}
1136
1137	return result
1138}
1139
1140type resultKeyspaceFrame struct {
1141	frameHeader
1142	keyspace string
1143}
1144
1145func (r *resultKeyspaceFrame) String() string {
1146	return fmt.Sprintf("[result_keyspace keyspace=%s]", r.keyspace)
1147}
1148
1149func (f *framer) parseResultSetKeyspace() frame {
1150	return &resultKeyspaceFrame{
1151		frameHeader: *f.header,
1152		keyspace:    f.readString(),
1153	}
1154}
1155
1156type resultPreparedFrame struct {
1157	frameHeader
1158
1159	preparedID []byte
1160	reqMeta    preparedMetadata
1161	respMeta   resultMetadata
1162}
1163
1164func (f *framer) parseResultPrepared() frame {
1165	frame := &resultPreparedFrame{
1166		frameHeader: *f.header,
1167		preparedID:  f.readShortBytes(),
1168		reqMeta:     f.parsePreparedMetadata(),
1169	}
1170
1171	if f.proto < protoVersion2 {
1172		return frame
1173	}
1174
1175	frame.respMeta = f.parseResultMetadata()
1176
1177	return frame
1178}
1179
1180type schemaChangeKeyspace struct {
1181	frameHeader
1182
1183	change   string
1184	keyspace string
1185}
1186
1187func (f schemaChangeKeyspace) String() string {
1188	return fmt.Sprintf("[event schema_change_keyspace change=%q keyspace=%q]", f.change, f.keyspace)
1189}
1190
1191type schemaChangeTable struct {
1192	frameHeader
1193
1194	change   string
1195	keyspace string
1196	object   string
1197}
1198
1199func (f schemaChangeTable) String() string {
1200	return fmt.Sprintf("[event schema_change change=%q keyspace=%q object=%q]", f.change, f.keyspace, f.object)
1201}
1202
1203type schemaChangeType struct {
1204	frameHeader
1205
1206	change   string
1207	keyspace string
1208	object   string
1209}
1210
1211type schemaChangeFunction struct {
1212	frameHeader
1213
1214	change   string
1215	keyspace string
1216	name     string
1217	args     []string
1218}
1219
1220type schemaChangeAggregate struct {
1221	frameHeader
1222
1223	change   string
1224	keyspace string
1225	name     string
1226	args     []string
1227}
1228
1229func (f *framer) parseResultSchemaChange() frame {
1230	if f.proto <= protoVersion2 {
1231		change := f.readString()
1232		keyspace := f.readString()
1233		table := f.readString()
1234
1235		if table != "" {
1236			return &schemaChangeTable{
1237				frameHeader: *f.header,
1238				change:      change,
1239				keyspace:    keyspace,
1240				object:      table,
1241			}
1242		} else {
1243			return &schemaChangeKeyspace{
1244				frameHeader: *f.header,
1245				change:      change,
1246				keyspace:    keyspace,
1247			}
1248		}
1249	} else {
1250		change := f.readString()
1251		target := f.readString()
1252
1253		// TODO: could just use a separate type for each target
1254		switch target {
1255		case "KEYSPACE":
1256			frame := &schemaChangeKeyspace{
1257				frameHeader: *f.header,
1258				change:      change,
1259			}
1260
1261			frame.keyspace = f.readString()
1262
1263			return frame
1264		case "TABLE":
1265			frame := &schemaChangeTable{
1266				frameHeader: *f.header,
1267				change:      change,
1268			}
1269
1270			frame.keyspace = f.readString()
1271			frame.object = f.readString()
1272
1273			return frame
1274		case "TYPE":
1275			frame := &schemaChangeType{
1276				frameHeader: *f.header,
1277				change:      change,
1278			}
1279
1280			frame.keyspace = f.readString()
1281			frame.object = f.readString()
1282
1283			return frame
1284		case "FUNCTION":
1285			frame := &schemaChangeFunction{
1286				frameHeader: *f.header,
1287				change:      change,
1288			}
1289
1290			frame.keyspace = f.readString()
1291			frame.name = f.readString()
1292			frame.args = f.readStringList()
1293
1294			return frame
1295		case "AGGREGATE":
1296			frame := &schemaChangeAggregate{
1297				frameHeader: *f.header,
1298				change:      change,
1299			}
1300
1301			frame.keyspace = f.readString()
1302			frame.name = f.readString()
1303			frame.args = f.readStringList()
1304
1305			return frame
1306		default:
1307			panic(fmt.Errorf("gocql: unknown SCHEMA_CHANGE target: %q change: %q", target, change))
1308		}
1309	}
1310
1311}
1312
1313type authenticateFrame struct {
1314	frameHeader
1315
1316	class string
1317}
1318
1319func (a *authenticateFrame) String() string {
1320	return fmt.Sprintf("[authenticate class=%q]", a.class)
1321}
1322
1323func (f *framer) parseAuthenticateFrame() frame {
1324	return &authenticateFrame{
1325		frameHeader: *f.header,
1326		class:       f.readString(),
1327	}
1328}
1329
1330type authSuccessFrame struct {
1331	frameHeader
1332
1333	data []byte
1334}
1335
1336func (a *authSuccessFrame) String() string {
1337	return fmt.Sprintf("[auth_success data=%q]", a.data)
1338}
1339
1340func (f *framer) parseAuthSuccessFrame() frame {
1341	return &authSuccessFrame{
1342		frameHeader: *f.header,
1343		data:        f.readBytes(),
1344	}
1345}
1346
1347type authChallengeFrame struct {
1348	frameHeader
1349
1350	data []byte
1351}
1352
1353func (a *authChallengeFrame) String() string {
1354	return fmt.Sprintf("[auth_challenge data=%q]", a.data)
1355}
1356
1357func (f *framer) parseAuthChallengeFrame() frame {
1358	return &authChallengeFrame{
1359		frameHeader: *f.header,
1360		data:        f.readBytes(),
1361	}
1362}
1363
1364type statusChangeEventFrame struct {
1365	frameHeader
1366
1367	change string
1368	host   net.IP
1369	port   int
1370}
1371
1372func (t statusChangeEventFrame) String() string {
1373	return fmt.Sprintf("[status_change change=%s host=%v port=%v]", t.change, t.host, t.port)
1374}
1375
1376// essentially the same as statusChange
1377type topologyChangeEventFrame struct {
1378	frameHeader
1379
1380	change string
1381	host   net.IP
1382	port   int
1383}
1384
1385func (t topologyChangeEventFrame) String() string {
1386	return fmt.Sprintf("[topology_change change=%s host=%v port=%v]", t.change, t.host, t.port)
1387}
1388
1389func (f *framer) parseEventFrame() frame {
1390	eventType := f.readString()
1391
1392	switch eventType {
1393	case "TOPOLOGY_CHANGE":
1394		frame := &topologyChangeEventFrame{frameHeader: *f.header}
1395		frame.change = f.readString()
1396		frame.host, frame.port = f.readInet()
1397
1398		return frame
1399	case "STATUS_CHANGE":
1400		frame := &statusChangeEventFrame{frameHeader: *f.header}
1401		frame.change = f.readString()
1402		frame.host, frame.port = f.readInet()
1403
1404		return frame
1405	case "SCHEMA_CHANGE":
1406		// this should work for all versions
1407		return f.parseResultSchemaChange()
1408	default:
1409		panic(fmt.Errorf("gocql: unknown event type: %q", eventType))
1410	}
1411
1412}
1413
1414type writeAuthResponseFrame struct {
1415	data []byte
1416}
1417
1418func (a *writeAuthResponseFrame) String() string {
1419	return fmt.Sprintf("[auth_response data=%q]", a.data)
1420}
1421
1422func (a *writeAuthResponseFrame) writeFrame(framer *framer, streamID int) error {
1423	return framer.writeAuthResponseFrame(streamID, a.data)
1424}
1425
1426func (f *framer) writeAuthResponseFrame(streamID int, data []byte) error {
1427	f.writeHeader(f.flags, opAuthResponse, streamID)
1428	f.writeBytes(data)
1429	return f.finishWrite()
1430}
1431
1432type queryValues struct {
1433	value []byte
1434
1435	// optional name, will set With names for values flag
1436	name    string
1437	isUnset bool
1438}
1439
1440type queryParams struct {
1441	consistency Consistency
1442	// v2+
1443	skipMeta          bool
1444	values            []queryValues
1445	pageSize          int
1446	pagingState       []byte
1447	serialConsistency SerialConsistency
1448	// v3+
1449	defaultTimestamp      bool
1450	defaultTimestampValue int64
1451	// v5+
1452	keyspace string
1453}
1454
1455func (q queryParams) String() string {
1456	return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v default_timestamp=%v values=%v keyspace=%s]",
1457		q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.defaultTimestamp, q.values, q.keyspace)
1458}
1459
1460func (f *framer) writeQueryParams(opts *queryParams) {
1461	f.writeConsistency(opts.consistency)
1462
1463	if f.proto == protoVersion1 {
1464		return
1465	}
1466
1467	var flags byte
1468	if len(opts.values) > 0 {
1469		flags |= flagValues
1470	}
1471	if opts.skipMeta {
1472		flags |= flagSkipMetaData
1473	}
1474	if opts.pageSize > 0 {
1475		flags |= flagPageSize
1476	}
1477	if len(opts.pagingState) > 0 {
1478		flags |= flagWithPagingState
1479	}
1480	if opts.serialConsistency > 0 {
1481		flags |= flagWithSerialConsistency
1482	}
1483
1484	names := false
1485
1486	// protoV3 specific things
1487	if f.proto > protoVersion2 {
1488		if opts.defaultTimestamp {
1489			flags |= flagDefaultTimestamp
1490		}
1491
1492		if len(opts.values) > 0 && opts.values[0].name != "" {
1493			flags |= flagWithNameValues
1494			names = true
1495		}
1496	}
1497
1498	if opts.keyspace != "" {
1499		if f.proto > protoVersion4 {
1500			flags |= flagWithKeyspace
1501		} else {
1502			panic(fmt.Errorf("The keyspace can only be set with protocol 5 or higher"))
1503		}
1504	}
1505
1506	if f.proto > protoVersion4 {
1507		f.writeUint(uint32(flags))
1508	} else {
1509		f.writeByte(flags)
1510	}
1511
1512	if n := len(opts.values); n > 0 {
1513		f.writeShort(uint16(n))
1514
1515		for i := 0; i < n; i++ {
1516			if names {
1517				f.writeString(opts.values[i].name)
1518			}
1519			if opts.values[i].isUnset {
1520				f.writeUnset()
1521			} else {
1522				f.writeBytes(opts.values[i].value)
1523			}
1524		}
1525	}
1526
1527	if opts.pageSize > 0 {
1528		f.writeInt(int32(opts.pageSize))
1529	}
1530
1531	if len(opts.pagingState) > 0 {
1532		f.writeBytes(opts.pagingState)
1533	}
1534
1535	if opts.serialConsistency > 0 {
1536		f.writeConsistency(Consistency(opts.serialConsistency))
1537	}
1538
1539	if f.proto > protoVersion2 && opts.defaultTimestamp {
1540		// timestamp in microseconds
1541		var ts int64
1542		if opts.defaultTimestampValue != 0 {
1543			ts = opts.defaultTimestampValue
1544		} else {
1545			ts = time.Now().UnixNano() / 1000
1546		}
1547		f.writeLong(ts)
1548	}
1549
1550	if opts.keyspace != "" {
1551		f.writeString(opts.keyspace)
1552	}
1553}
1554
1555type writeQueryFrame struct {
1556	statement string
1557	params    queryParams
1558
1559	// v4+
1560	customPayload map[string][]byte
1561}
1562
1563func (w *writeQueryFrame) String() string {
1564	return fmt.Sprintf("[query statement=%q params=%v]", w.statement, w.params)
1565}
1566
1567func (w *writeQueryFrame) writeFrame(framer *framer, streamID int) error {
1568	return framer.writeQueryFrame(streamID, w.statement, &w.params, w.customPayload)
1569}
1570
1571func (f *framer) writeQueryFrame(streamID int, statement string, params *queryParams, customPayload map[string][]byte) error {
1572	if len(customPayload) > 0 {
1573		f.payload()
1574	}
1575	f.writeHeader(f.flags, opQuery, streamID)
1576	f.writeCustomPayload(&customPayload)
1577	f.writeLongString(statement)
1578	f.writeQueryParams(params)
1579
1580	return f.finishWrite()
1581}
1582
1583type frameWriter interface {
1584	writeFrame(framer *framer, streamID int) error
1585}
1586
1587type frameWriterFunc func(framer *framer, streamID int) error
1588
1589func (f frameWriterFunc) writeFrame(framer *framer, streamID int) error {
1590	return f(framer, streamID)
1591}
1592
1593type writeExecuteFrame struct {
1594	preparedID []byte
1595	params     queryParams
1596
1597	// v4+
1598	customPayload map[string][]byte
1599}
1600
1601func (e *writeExecuteFrame) String() string {
1602	return fmt.Sprintf("[execute id=% X params=%v]", e.preparedID, &e.params)
1603}
1604
1605func (e *writeExecuteFrame) writeFrame(fr *framer, streamID int) error {
1606	return fr.writeExecuteFrame(streamID, e.preparedID, &e.params, &e.customPayload)
1607}
1608
1609func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *queryParams, customPayload *map[string][]byte) error {
1610	if len(*customPayload) > 0 {
1611		f.payload()
1612	}
1613	f.writeHeader(f.flags, opExecute, streamID)
1614	f.writeCustomPayload(customPayload)
1615	f.writeShortBytes(preparedID)
1616	if f.proto > protoVersion1 {
1617		f.writeQueryParams(params)
1618	} else {
1619		n := len(params.values)
1620		f.writeShort(uint16(n))
1621		for i := 0; i < n; i++ {
1622			if params.values[i].isUnset {
1623				f.writeUnset()
1624			} else {
1625				f.writeBytes(params.values[i].value)
1626			}
1627		}
1628		f.writeConsistency(params.consistency)
1629	}
1630
1631	return f.finishWrite()
1632}
1633
1634// TODO: can we replace BatchStatemt with batchStatement? As they prety much
1635// duplicate each other
1636type batchStatment struct {
1637	preparedID []byte
1638	statement  string
1639	values     []queryValues
1640}
1641
1642type writeBatchFrame struct {
1643	typ         BatchType
1644	statements  []batchStatment
1645	consistency Consistency
1646
1647	// v3+
1648	serialConsistency     SerialConsistency
1649	defaultTimestamp      bool
1650	defaultTimestampValue int64
1651
1652	//v4+
1653	customPayload map[string][]byte
1654}
1655
1656func (w *writeBatchFrame) writeFrame(framer *framer, streamID int) error {
1657	return framer.writeBatchFrame(streamID, w, w.customPayload)
1658}
1659
1660func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload map[string][]byte) error {
1661	if len(customPayload) > 0 {
1662		f.payload()
1663	}
1664	f.writeHeader(f.flags, opBatch, streamID)
1665	f.writeCustomPayload(&customPayload)
1666	f.writeByte(byte(w.typ))
1667
1668	n := len(w.statements)
1669	f.writeShort(uint16(n))
1670
1671	var flags byte
1672
1673	for i := 0; i < n; i++ {
1674		b := &w.statements[i]
1675		if len(b.preparedID) == 0 {
1676			f.writeByte(0)
1677			f.writeLongString(b.statement)
1678		} else {
1679			f.writeByte(1)
1680			f.writeShortBytes(b.preparedID)
1681		}
1682
1683		f.writeShort(uint16(len(b.values)))
1684		for j := range b.values {
1685			col := b.values[j]
1686			if f.proto > protoVersion2 && col.name != "" {
1687				// TODO: move this check into the caller and set a flag on writeBatchFrame
1688				// to indicate using named values
1689				if f.proto <= protoVersion5 {
1690					return fmt.Errorf("gocql: named query values are not supported in batches, please see https://issues.apache.org/jira/browse/CASSANDRA-10246")
1691				}
1692				flags |= flagWithNameValues
1693				f.writeString(col.name)
1694			}
1695			if col.isUnset {
1696				f.writeUnset()
1697			} else {
1698				f.writeBytes(col.value)
1699			}
1700		}
1701	}
1702
1703	f.writeConsistency(w.consistency)
1704
1705	if f.proto > protoVersion2 {
1706		if w.serialConsistency > 0 {
1707			flags |= flagWithSerialConsistency
1708		}
1709		if w.defaultTimestamp {
1710			flags |= flagDefaultTimestamp
1711		}
1712
1713		if f.proto > protoVersion4 {
1714			f.writeUint(uint32(flags))
1715		} else {
1716			f.writeByte(flags)
1717		}
1718
1719		if w.serialConsistency > 0 {
1720			f.writeConsistency(Consistency(w.serialConsistency))
1721		}
1722
1723		if w.defaultTimestamp {
1724			var ts int64
1725			if w.defaultTimestampValue != 0 {
1726				ts = w.defaultTimestampValue
1727			} else {
1728				ts = time.Now().UnixNano() / 1000
1729			}
1730			f.writeLong(ts)
1731		}
1732	}
1733
1734	return f.finishWrite()
1735}
1736
1737type writeOptionsFrame struct{}
1738
1739func (w *writeOptionsFrame) writeFrame(framer *framer, streamID int) error {
1740	return framer.writeOptionsFrame(streamID, w)
1741}
1742
1743func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) error {
1744	f.writeHeader(f.flags&^flagCompress, opOptions, stream)
1745	return f.finishWrite()
1746}
1747
1748type writeRegisterFrame struct {
1749	events []string
1750}
1751
1752func (w *writeRegisterFrame) writeFrame(framer *framer, streamID int) error {
1753	return framer.writeRegisterFrame(streamID, w)
1754}
1755
1756func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) error {
1757	f.writeHeader(f.flags, opRegister, streamID)
1758	f.writeStringList(w.events)
1759
1760	return f.finishWrite()
1761}
1762
1763func (f *framer) readByte() byte {
1764	if len(f.rbuf) < 1 {
1765		panic(fmt.Errorf("not enough bytes in buffer to read byte require 1 got: %d", len(f.rbuf)))
1766	}
1767
1768	b := f.rbuf[0]
1769	f.rbuf = f.rbuf[1:]
1770	return b
1771}
1772
1773func (f *framer) readInt() (n int) {
1774	if len(f.rbuf) < 4 {
1775		panic(fmt.Errorf("not enough bytes in buffer to read int require 4 got: %d", len(f.rbuf)))
1776	}
1777
1778	n = int(int32(f.rbuf[0])<<24 | int32(f.rbuf[1])<<16 | int32(f.rbuf[2])<<8 | int32(f.rbuf[3]))
1779	f.rbuf = f.rbuf[4:]
1780	return
1781}
1782
1783func (f *framer) readShort() (n uint16) {
1784	if len(f.rbuf) < 2 {
1785		panic(fmt.Errorf("not enough bytes in buffer to read short require 2 got: %d", len(f.rbuf)))
1786	}
1787	n = uint16(f.rbuf[0])<<8 | uint16(f.rbuf[1])
1788	f.rbuf = f.rbuf[2:]
1789	return
1790}
1791
1792func (f *framer) readLong() (n int64) {
1793	if len(f.rbuf) < 8 {
1794		panic(fmt.Errorf("not enough bytes in buffer to read long require 8 got: %d", len(f.rbuf)))
1795	}
1796	n = int64(f.rbuf[0])<<56 | int64(f.rbuf[1])<<48 | int64(f.rbuf[2])<<40 | int64(f.rbuf[3])<<32 |
1797		int64(f.rbuf[4])<<24 | int64(f.rbuf[5])<<16 | int64(f.rbuf[6])<<8 | int64(f.rbuf[7])
1798	f.rbuf = f.rbuf[8:]
1799	return
1800}
1801
1802func (f *framer) readString() (s string) {
1803	size := f.readShort()
1804
1805	if len(f.rbuf) < int(size) {
1806		panic(fmt.Errorf("not enough bytes in buffer to read string require %d got: %d", size, len(f.rbuf)))
1807	}
1808
1809	s = string(f.rbuf[:size])
1810	f.rbuf = f.rbuf[size:]
1811	return
1812}
1813
1814func (f *framer) readLongString() (s string) {
1815	size := f.readInt()
1816
1817	if len(f.rbuf) < size {
1818		panic(fmt.Errorf("not enough bytes in buffer to read long string require %d got: %d", size, len(f.rbuf)))
1819	}
1820
1821	s = string(f.rbuf[:size])
1822	f.rbuf = f.rbuf[size:]
1823	return
1824}
1825
1826func (f *framer) readUUID() *UUID {
1827	if len(f.rbuf) < 16 {
1828		panic(fmt.Errorf("not enough bytes in buffer to read uuid require %d got: %d", 16, len(f.rbuf)))
1829	}
1830
1831	// TODO: how to handle this error, if it is a uuid, then sureley, problems?
1832	u, _ := UUIDFromBytes(f.rbuf[:16])
1833	f.rbuf = f.rbuf[16:]
1834	return &u
1835}
1836
1837func (f *framer) readStringList() []string {
1838	size := f.readShort()
1839
1840	l := make([]string, size)
1841	for i := 0; i < int(size); i++ {
1842		l[i] = f.readString()
1843	}
1844
1845	return l
1846}
1847
1848func (f *framer) readBytesInternal() ([]byte, error) {
1849	size := f.readInt()
1850	if size < 0 {
1851		return nil, nil
1852	}
1853
1854	if len(f.rbuf) < size {
1855		return nil, fmt.Errorf("not enough bytes in buffer to read bytes require %d got: %d", size, len(f.rbuf))
1856	}
1857
1858	l := f.rbuf[:size]
1859	f.rbuf = f.rbuf[size:]
1860
1861	return l, nil
1862}
1863
1864func (f *framer) readBytes() []byte {
1865	l, err := f.readBytesInternal()
1866	if err != nil {
1867		panic(err)
1868	}
1869
1870	return l
1871}
1872
1873func (f *framer) readShortBytes() []byte {
1874	size := f.readShort()
1875	if len(f.rbuf) < int(size) {
1876		panic(fmt.Errorf("not enough bytes in buffer to read short bytes: require %d got %d", size, len(f.rbuf)))
1877	}
1878
1879	l := f.rbuf[:size]
1880	f.rbuf = f.rbuf[size:]
1881
1882	return l
1883}
1884
1885func (f *framer) readInetAdressOnly() net.IP {
1886	if len(f.rbuf) < 1 {
1887		panic(fmt.Errorf("not enough bytes in buffer to read inet size require %d got: %d", 1, len(f.rbuf)))
1888	}
1889
1890	size := f.rbuf[0]
1891	f.rbuf = f.rbuf[1:]
1892
1893	if !(size == 4 || size == 16) {
1894		panic(fmt.Errorf("invalid IP size: %d", size))
1895	}
1896
1897	if len(f.rbuf) < 1 {
1898		panic(fmt.Errorf("not enough bytes in buffer to read inet require %d got: %d", size, len(f.rbuf)))
1899	}
1900
1901	ip := make([]byte, size)
1902	copy(ip, f.rbuf[:size])
1903	f.rbuf = f.rbuf[size:]
1904	return net.IP(ip)
1905}
1906
1907func (f *framer) readInet() (net.IP, int) {
1908	return f.readInetAdressOnly(), f.readInt()
1909}
1910
1911func (f *framer) readConsistency() Consistency {
1912	return Consistency(f.readShort())
1913}
1914
1915func (f *framer) readStringMap() map[string]string {
1916	size := f.readShort()
1917	m := make(map[string]string, size)
1918
1919	for i := 0; i < int(size); i++ {
1920		k := f.readString()
1921		v := f.readString()
1922		m[k] = v
1923	}
1924
1925	return m
1926}
1927
1928func (f *framer) readBytesMap() map[string][]byte {
1929	size := f.readShort()
1930	m := make(map[string][]byte, size)
1931
1932	for i := 0; i < int(size); i++ {
1933		k := f.readString()
1934		v := f.readBytes()
1935		m[k] = v
1936	}
1937
1938	return m
1939}
1940
1941func (f *framer) readStringMultiMap() map[string][]string {
1942	size := f.readShort()
1943	m := make(map[string][]string, size)
1944
1945	for i := 0; i < int(size); i++ {
1946		k := f.readString()
1947		v := f.readStringList()
1948		m[k] = v
1949	}
1950
1951	return m
1952}
1953
1954func (f *framer) writeByte(b byte) {
1955	f.wbuf = append(f.wbuf, b)
1956}
1957
1958func appendBytes(p []byte, d []byte) []byte {
1959	if d == nil {
1960		return appendInt(p, -1)
1961	}
1962	p = appendInt(p, int32(len(d)))
1963	p = append(p, d...)
1964	return p
1965}
1966
1967func appendShort(p []byte, n uint16) []byte {
1968	return append(p,
1969		byte(n>>8),
1970		byte(n),
1971	)
1972}
1973
1974func appendInt(p []byte, n int32) []byte {
1975	return append(p, byte(n>>24),
1976		byte(n>>16),
1977		byte(n>>8),
1978		byte(n))
1979}
1980
1981func appendUint(p []byte, n uint32) []byte {
1982	return append(p, byte(n>>24),
1983		byte(n>>16),
1984		byte(n>>8),
1985		byte(n))
1986}
1987
1988func appendLong(p []byte, n int64) []byte {
1989	return append(p,
1990		byte(n>>56),
1991		byte(n>>48),
1992		byte(n>>40),
1993		byte(n>>32),
1994		byte(n>>24),
1995		byte(n>>16),
1996		byte(n>>8),
1997		byte(n),
1998	)
1999}
2000
2001func (f *framer) writeCustomPayload(customPayload *map[string][]byte) {
2002	if len(*customPayload) > 0 {
2003		if f.proto < protoVersion4 {
2004			panic("Custom payload is not supported with version V3 or less")
2005		}
2006		f.writeBytesMap(*customPayload)
2007	}
2008}
2009
2010// these are protocol level binary types
2011func (f *framer) writeInt(n int32) {
2012	f.wbuf = appendInt(f.wbuf, n)
2013}
2014
2015func (f *framer) writeUint(n uint32) {
2016	f.wbuf = appendUint(f.wbuf, n)
2017}
2018
2019func (f *framer) writeShort(n uint16) {
2020	f.wbuf = appendShort(f.wbuf, n)
2021}
2022
2023func (f *framer) writeLong(n int64) {
2024	f.wbuf = appendLong(f.wbuf, n)
2025}
2026
2027func (f *framer) writeString(s string) {
2028	f.writeShort(uint16(len(s)))
2029	f.wbuf = append(f.wbuf, s...)
2030}
2031
2032func (f *framer) writeLongString(s string) {
2033	f.writeInt(int32(len(s)))
2034	f.wbuf = append(f.wbuf, s...)
2035}
2036
2037func (f *framer) writeUUID(u *UUID) {
2038	f.wbuf = append(f.wbuf, u[:]...)
2039}
2040
2041func (f *framer) writeStringList(l []string) {
2042	f.writeShort(uint16(len(l)))
2043	for _, s := range l {
2044		f.writeString(s)
2045	}
2046}
2047
2048func (f *framer) writeUnset() {
2049	// Protocol version 4 specifies that bind variables do not require having a
2050	// value when executing a statement.   Bind variables without a value are
2051	// called 'unset'. The 'unset' bind variable is serialized as the int
2052	// value '-2' without following bytes.
2053	f.writeInt(-2)
2054}
2055
2056func (f *framer) writeBytes(p []byte) {
2057	// TODO: handle null case correctly,
2058	//     [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
2059	//					  no byte should follow and the value represented is `null`.
2060	if p == nil {
2061		f.writeInt(-1)
2062	} else {
2063		f.writeInt(int32(len(p)))
2064		f.wbuf = append(f.wbuf, p...)
2065	}
2066}
2067
2068func (f *framer) writeShortBytes(p []byte) {
2069	f.writeShort(uint16(len(p)))
2070	f.wbuf = append(f.wbuf, p...)
2071}
2072
2073func (f *framer) writeInet(ip net.IP, port int) {
2074	f.wbuf = append(f.wbuf,
2075		byte(len(ip)),
2076	)
2077
2078	f.wbuf = append(f.wbuf,
2079		[]byte(ip)...,
2080	)
2081
2082	f.writeInt(int32(port))
2083}
2084
2085func (f *framer) writeConsistency(cons Consistency) {
2086	f.writeShort(uint16(cons))
2087}
2088
2089func (f *framer) writeStringMap(m map[string]string) {
2090	f.writeShort(uint16(len(m)))
2091	for k, v := range m {
2092		f.writeString(k)
2093		f.writeString(v)
2094	}
2095}
2096
2097func (f *framer) writeBytesMap(m map[string][]byte) {
2098	f.writeShort(uint16(len(m)))
2099	for k, v := range m {
2100		f.writeString(k)
2101		f.writeBytes(v)
2102	}
2103}
2104