1package mssql
2
3import (
4	"context"
5	"encoding/binary"
6	"errors"
7	"fmt"
8	"io"
9	"net"
10	"strconv"
11	"strings"
12)
13
14//go:generate stringer -type token
15
16type token byte
17
18// token ids
19const (
20	tokenReturnStatus  token = 121 // 0x79
21	tokenColMetadata   token = 129 // 0x81
22	tokenOrder         token = 169 // 0xA9
23	tokenError         token = 170 // 0xAA
24	tokenInfo          token = 171 // 0xAB
25	tokenReturnValue   token = 0xAC
26	tokenLoginAck      token = 173 // 0xad
27	tokenFeatureExtAck token = 174 // 0xae
28	tokenRow           token = 209 // 0xd1
29	tokenNbcRow        token = 210 // 0xd2
30	tokenEnvChange     token = 227 // 0xE3
31	tokenSSPI          token = 237 // 0xED
32	tokenDone          token = 253 // 0xFD
33	tokenDoneProc      token = 254
34	tokenDoneInProc    token = 255
35)
36
37// done flags
38// https://msdn.microsoft.com/en-us/library/dd340421.aspx
39const (
40	doneFinal    = 0
41	doneMore     = 1
42	doneError    = 2
43	doneInxact   = 4
44	doneCount    = 0x10
45	doneAttn     = 0x20
46	doneSrvError = 0x100
47)
48
49// ENVCHANGE types
50// http://msdn.microsoft.com/en-us/library/dd303449.aspx
51const (
52	envTypDatabase           = 1
53	envTypLanguage           = 2
54	envTypCharset            = 3
55	envTypPacketSize         = 4
56	envSortId                = 5
57	envSortFlags             = 6
58	envSqlCollation          = 7
59	envTypBeginTran          = 8
60	envTypCommitTran         = 9
61	envTypRollbackTran       = 10
62	envEnlistDTC             = 11
63	envDefectTran            = 12
64	envDatabaseMirrorPartner = 13
65	envPromoteTran           = 15
66	envTranMgrAddr           = 16
67	envTranEnded             = 17
68	envResetConnAck          = 18
69	envStartedInstanceName   = 19
70	envRouting               = 20
71)
72
73// COLMETADATA flags
74// https://msdn.microsoft.com/en-us/library/dd357363.aspx
75const (
76	colFlagNullable = 1
77	// TODO implement more flags
78)
79
80// interface for all tokens
81type tokenStruct interface{}
82
83type orderStruct struct {
84	ColIds []uint16
85}
86
87type doneStruct struct {
88	Status   uint16
89	CurCmd   uint16
90	RowCount uint64
91	errors   []Error
92}
93
94func (d doneStruct) isError() bool {
95	return d.Status&doneError != 0 || len(d.errors) > 0
96}
97
98func (d doneStruct) getError() Error {
99	if len(d.errors) > 0 {
100		return d.errors[len(d.errors)-1]
101	} else {
102		return Error{Message: "Request failed but didn't provide reason"}
103	}
104}
105
106type doneInProcStruct doneStruct
107
108var doneFlags2str = map[uint16]string{
109	doneFinal:    "final",
110	doneMore:     "more",
111	doneError:    "error",
112	doneInxact:   "inxact",
113	doneCount:    "count",
114	doneAttn:     "attn",
115	doneSrvError: "srverror",
116}
117
118func doneFlags2Str(flags uint16) string {
119	strs := make([]string, 0, len(doneFlags2str))
120	for flag, tag := range doneFlags2str {
121		if flags&flag != 0 {
122			strs = append(strs, tag)
123		}
124	}
125	return strings.Join(strs, "|")
126}
127
128// ENVCHANGE stream
129// http://msdn.microsoft.com/en-us/library/dd303449.aspx
130func processEnvChg(sess *tdsSession) {
131	size := sess.buf.uint16()
132	r := &io.LimitedReader{R: sess.buf, N: int64(size)}
133	for {
134		var err error
135		var envtype uint8
136		err = binary.Read(r, binary.LittleEndian, &envtype)
137		if err == io.EOF {
138			return
139		}
140		if err != nil {
141			badStreamPanic(err)
142		}
143		switch envtype {
144		case envTypDatabase:
145			sess.database, err = readBVarChar(r)
146			if err != nil {
147				badStreamPanic(err)
148			}
149			_, err = readBVarChar(r)
150			if err != nil {
151				badStreamPanic(err)
152			}
153		case envTypLanguage:
154			// currently ignored
155			// new value
156			if _, err = readBVarChar(r); err != nil {
157				badStreamPanic(err)
158			}
159			// old value
160			if _, err = readBVarChar(r); err != nil {
161				badStreamPanic(err)
162			}
163		case envTypCharset:
164			// currently ignored
165			// new value
166			if _, err = readBVarChar(r); err != nil {
167				badStreamPanic(err)
168			}
169			// old value
170			if _, err = readBVarChar(r); err != nil {
171				badStreamPanic(err)
172			}
173		case envTypPacketSize:
174			packetsize, err := readBVarChar(r)
175			if err != nil {
176				badStreamPanic(err)
177			}
178			_, err = readBVarChar(r)
179			if err != nil {
180				badStreamPanic(err)
181			}
182			packetsizei, err := strconv.Atoi(packetsize)
183			if err != nil {
184				badStreamPanicf("Invalid Packet size value returned from server (%s): %s", packetsize, err.Error())
185			}
186			sess.buf.ResizeBuffer(packetsizei)
187		case envSortId:
188			// currently ignored
189			// new value
190			if _, err = readBVarChar(r); err != nil {
191				badStreamPanic(err)
192			}
193			// old value, should be 0
194			if _, err = readBVarChar(r); err != nil {
195				badStreamPanic(err)
196			}
197		case envSortFlags:
198			// currently ignored
199			// new value
200			if _, err = readBVarChar(r); err != nil {
201				badStreamPanic(err)
202			}
203			// old value, should be 0
204			if _, err = readBVarChar(r); err != nil {
205				badStreamPanic(err)
206			}
207		case envSqlCollation:
208			// currently ignored
209			var collationSize uint8
210			err = binary.Read(r, binary.LittleEndian, &collationSize)
211			if err != nil {
212				badStreamPanic(err)
213			}
214
215			// SQL Collation data should contain 5 bytes in length
216			if collationSize != 5 {
217				badStreamPanicf("Invalid SQL Collation size value returned from server: %d", collationSize)
218			}
219
220			// 4 bytes, contains: LCID ColFlags Version
221			var info uint32
222			err = binary.Read(r, binary.LittleEndian, &info)
223			if err != nil {
224				badStreamPanic(err)
225			}
226
227			// 1 byte, contains: sortID
228			var sortID uint8
229			err = binary.Read(r, binary.LittleEndian, &sortID)
230			if err != nil {
231				badStreamPanic(err)
232			}
233
234			// old value, should be 0
235			if _, err = readBVarChar(r); err != nil {
236				badStreamPanic(err)
237			}
238		case envTypBeginTran:
239			tranid, err := readBVarByte(r)
240			if len(tranid) != 8 {
241				badStreamPanicf("invalid size of transaction identifier: %d", len(tranid))
242			}
243			sess.tranid = binary.LittleEndian.Uint64(tranid)
244			if err != nil {
245				badStreamPanic(err)
246			}
247			if sess.logFlags&logTransaction != 0 {
248				sess.log.Printf("BEGIN TRANSACTION %x\n", sess.tranid)
249			}
250			_, err = readBVarByte(r)
251			if err != nil {
252				badStreamPanic(err)
253			}
254		case envTypCommitTran, envTypRollbackTran:
255			_, err = readBVarByte(r)
256			if err != nil {
257				badStreamPanic(err)
258			}
259			_, err = readBVarByte(r)
260			if err != nil {
261				badStreamPanic(err)
262			}
263			if sess.logFlags&logTransaction != 0 {
264				if envtype == envTypCommitTran {
265					sess.log.Printf("COMMIT TRANSACTION %x\n", sess.tranid)
266				} else {
267					sess.log.Printf("ROLLBACK TRANSACTION %x\n", sess.tranid)
268				}
269			}
270			sess.tranid = 0
271		case envEnlistDTC:
272			// currently ignored
273			// new value, should be 0
274			if _, err = readBVarChar(r); err != nil {
275				badStreamPanic(err)
276			}
277			// old value
278			if _, err = readBVarChar(r); err != nil {
279				badStreamPanic(err)
280			}
281		case envDefectTran:
282			// currently ignored
283			// new value
284			if _, err = readBVarChar(r); err != nil {
285				badStreamPanic(err)
286			}
287			// old value, should be 0
288			if _, err = readBVarChar(r); err != nil {
289				badStreamPanic(err)
290			}
291		case envDatabaseMirrorPartner:
292			sess.partner, err = readBVarChar(r)
293			if err != nil {
294				badStreamPanic(err)
295			}
296			_, err = readBVarChar(r)
297			if err != nil {
298				badStreamPanic(err)
299			}
300		case envPromoteTran:
301			// currently ignored
302			// old value, should be 0
303			if _, err = readBVarChar(r); err != nil {
304				badStreamPanic(err)
305			}
306			// dtc token
307			// spec says it should be L_VARBYTE, so this code might be wrong
308			if _, err = readBVarChar(r); err != nil {
309				badStreamPanic(err)
310			}
311		case envTranMgrAddr:
312			// currently ignored
313			// old value, should be 0
314			if _, err = readBVarChar(r); err != nil {
315				badStreamPanic(err)
316			}
317			// XACT_MANAGER_ADDRESS = B_VARBYTE
318			if _, err = readBVarChar(r); err != nil {
319				badStreamPanic(err)
320			}
321		case envTranEnded:
322			// currently ignored
323			// old value, B_VARBYTE
324			if _, err = readBVarChar(r); err != nil {
325				badStreamPanic(err)
326			}
327			// should be 0
328			if _, err = readBVarChar(r); err != nil {
329				badStreamPanic(err)
330			}
331		case envResetConnAck:
332			// currently ignored
333			// old value, should be 0
334			if _, err = readBVarChar(r); err != nil {
335				badStreamPanic(err)
336			}
337			// should be 0
338			if _, err = readBVarChar(r); err != nil {
339				badStreamPanic(err)
340			}
341		case envStartedInstanceName:
342			// currently ignored
343			// old value, should be 0
344			if _, err = readBVarChar(r); err != nil {
345				badStreamPanic(err)
346			}
347			// instance name
348			if _, err = readBVarChar(r); err != nil {
349				badStreamPanic(err)
350			}
351		case envRouting:
352			// RoutingData message is:
353			// ValueLength                 USHORT
354			// Protocol (TCP = 0)          BYTE
355			// ProtocolProperty (new port) USHORT
356			// AlternateServer             US_VARCHAR
357			_, err := readUshort(r)
358			if err != nil {
359				badStreamPanic(err)
360			}
361			protocol, err := readByte(r)
362			if err != nil || protocol != 0 {
363				badStreamPanic(err)
364			}
365			newPort, err := readUshort(r)
366			if err != nil {
367				badStreamPanic(err)
368			}
369			newServer, err := readUsVarChar(r)
370			if err != nil {
371				badStreamPanic(err)
372			}
373			// consume the OLDVALUE = %x00 %x00
374			_, err = readUshort(r)
375			if err != nil {
376				badStreamPanic(err)
377			}
378			sess.routedServer = newServer
379			sess.routedPort = newPort
380		default:
381			// ignore rest of records because we don't know how to skip those
382			sess.log.Printf("WARN: Unknown ENVCHANGE record detected with type id = %d\n", envtype)
383			break
384		}
385
386	}
387}
388
389// http://msdn.microsoft.com/en-us/library/dd358180.aspx
390func parseReturnStatus(r *tdsBuffer) ReturnStatus {
391	return ReturnStatus(r.int32())
392}
393
394func parseOrder(r *tdsBuffer) (res orderStruct) {
395	len := int(r.uint16())
396	res.ColIds = make([]uint16, len/2)
397	for i := 0; i < len/2; i++ {
398		res.ColIds[i] = r.uint16()
399	}
400	return res
401}
402
403// https://msdn.microsoft.com/en-us/library/dd340421.aspx
404func parseDone(r *tdsBuffer) (res doneStruct) {
405	res.Status = r.uint16()
406	res.CurCmd = r.uint16()
407	res.RowCount = r.uint64()
408	return res
409}
410
411// https://msdn.microsoft.com/en-us/library/dd340553.aspx
412func parseDoneInProc(r *tdsBuffer) (res doneInProcStruct) {
413	res.Status = r.uint16()
414	res.CurCmd = r.uint16()
415	res.RowCount = r.uint64()
416	return res
417}
418
419type sspiMsg []byte
420
421func parseSSPIMsg(r *tdsBuffer) sspiMsg {
422	size := r.uint16()
423	buf := make([]byte, size)
424	r.ReadFull(buf)
425	return sspiMsg(buf)
426}
427
428type loginAckStruct struct {
429	Interface  uint8
430	TDSVersion uint32
431	ProgName   string
432	ProgVer    uint32
433}
434
435func parseLoginAck(r *tdsBuffer) loginAckStruct {
436	size := r.uint16()
437	buf := make([]byte, size)
438	r.ReadFull(buf)
439	var res loginAckStruct
440	res.Interface = buf[0]
441	res.TDSVersion = binary.BigEndian.Uint32(buf[1:])
442	prognamelen := buf[1+4]
443	var err error
444	if res.ProgName, err = ucs22str(buf[1+4+1 : 1+4+1+prognamelen*2]); err != nil {
445		badStreamPanic(err)
446	}
447	res.ProgVer = binary.BigEndian.Uint32(buf[size-4:])
448	return res
449}
450
451// https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/2eb82f8e-11f0-46dc-b42d-27302fa4701a
452func parseFeatureExtAck(r *tdsBuffer) {
453	// at most 1 featureAck per feature in featureExt
454	// go-mssqldb will add at most 1 feature, the spec defines 7 different features
455	for i := 0; i < 8; i++ {
456		featureID := r.byte() // FeatureID
457		if featureID == 0xff {
458			return
459		}
460		size := r.uint32() // FeatureAckDataLen
461		d := make([]byte, size)
462		r.ReadFull(d)
463	}
464	panic("parsed more than 7 featureAck's, protocol implementation error?")
465}
466
467// http://msdn.microsoft.com/en-us/library/dd357363.aspx
468func parseColMetadata72(r *tdsBuffer) (columns []columnStruct) {
469	count := r.uint16()
470	if count == 0xffff {
471		// no metadata is sent
472		return nil
473	}
474	columns = make([]columnStruct, count)
475	for i := range columns {
476		column := &columns[i]
477		column.UserType = r.uint32()
478		column.Flags = r.uint16()
479
480		// parsing TYPE_INFO structure
481		column.ti = readTypeInfo(r)
482		column.ColName = r.BVarChar()
483	}
484	return columns
485}
486
487// http://msdn.microsoft.com/en-us/library/dd357254.aspx
488func parseRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {
489	for i, column := range columns {
490		row[i] = column.ti.Reader(&column.ti, r)
491	}
492}
493
494// http://msdn.microsoft.com/en-us/library/dd304783.aspx
495func parseNbcRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {
496	bitlen := (len(columns) + 7) / 8
497	pres := make([]byte, bitlen)
498	r.ReadFull(pres)
499	for i, col := range columns {
500		if pres[i/8]&(1<<(uint(i)%8)) != 0 {
501			row[i] = nil
502			continue
503		}
504		row[i] = col.ti.Reader(&col.ti, r)
505	}
506}
507
508// http://msdn.microsoft.com/en-us/library/dd304156.aspx
509func parseError72(r *tdsBuffer) (res Error) {
510	length := r.uint16()
511	_ = length // ignore length
512	res.Number = r.int32()
513	res.State = r.byte()
514	res.Class = r.byte()
515	res.Message = r.UsVarChar()
516	res.ServerName = r.BVarChar()
517	res.ProcName = r.BVarChar()
518	res.LineNo = r.int32()
519	return
520}
521
522// http://msdn.microsoft.com/en-us/library/dd304156.aspx
523func parseInfo(r *tdsBuffer) (res Error) {
524	length := r.uint16()
525	_ = length // ignore length
526	res.Number = r.int32()
527	res.State = r.byte()
528	res.Class = r.byte()
529	res.Message = r.UsVarChar()
530	res.ServerName = r.BVarChar()
531	res.ProcName = r.BVarChar()
532	res.LineNo = r.int32()
533	return
534}
535
536// https://msdn.microsoft.com/en-us/library/dd303881.aspx
537func parseReturnValue(r *tdsBuffer) (nv namedValue) {
538	/*
539		ParamOrdinal
540		ParamName
541		Status
542		UserType
543		Flags
544		TypeInfo
545		CryptoMetadata
546		Value
547	*/
548	r.uint16()
549	nv.Name = r.BVarChar()
550	r.byte()
551	r.uint32() // UserType (uint16 prior to 7.2)
552	r.uint16()
553	ti := readTypeInfo(r)
554	nv.Value = ti.Reader(&ti, r)
555	return
556}
557
558func processSingleResponse(sess *tdsSession, ch chan tokenStruct, outs map[string]interface{}) {
559	defer func() {
560		if err := recover(); err != nil {
561			if sess.logFlags&logErrors != 0 {
562				sess.log.Printf("ERROR: Intercepted panic %v", err)
563			}
564			ch <- err
565		}
566		close(ch)
567	}()
568
569	packet_type, err := sess.buf.BeginRead()
570	if err != nil {
571		if sess.logFlags&logErrors != 0 {
572			sess.log.Printf("ERROR: BeginRead failed %v", err)
573		}
574		ch <- err
575		return
576	}
577	if packet_type != packReply {
578		badStreamPanic(fmt.Errorf("unexpected packet type in reply: got %v, expected %v", packet_type, packReply))
579	}
580	var columns []columnStruct
581	errs := make([]Error, 0, 5)
582	for {
583		token := token(sess.buf.byte())
584		if sess.logFlags&logDebug != 0 {
585			sess.log.Printf("got token %v", token)
586		}
587		switch token {
588		case tokenSSPI:
589			ch <- parseSSPIMsg(sess.buf)
590			return
591		case tokenReturnStatus:
592			returnStatus := parseReturnStatus(sess.buf)
593			ch <- returnStatus
594		case tokenLoginAck:
595			loginAck := parseLoginAck(sess.buf)
596			ch <- loginAck
597		case tokenFeatureExtAck:
598			parseFeatureExtAck(sess.buf)
599		case tokenOrder:
600			order := parseOrder(sess.buf)
601			ch <- order
602		case tokenDoneInProc:
603			done := parseDoneInProc(sess.buf)
604			if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {
605				sess.log.Printf("(%d row(s) affected)\n", done.RowCount)
606			}
607			ch <- done
608		case tokenDone, tokenDoneProc:
609			done := parseDone(sess.buf)
610			done.errors = errs
611			if sess.logFlags&logDebug != 0 {
612				sess.log.Printf("got DONE or DONEPROC status=%d", done.Status)
613			}
614			if done.Status&doneSrvError != 0 {
615				ch <- errors.New("SQL Server had internal error")
616				return
617			}
618			if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {
619				sess.log.Printf("(%d row(s) affected)\n", done.RowCount)
620			}
621			ch <- done
622			if done.Status&doneMore == 0 {
623				return
624			}
625		case tokenColMetadata:
626			columns = parseColMetadata72(sess.buf)
627			ch <- columns
628		case tokenRow:
629			row := make([]interface{}, len(columns))
630			parseRow(sess.buf, columns, row)
631			ch <- row
632		case tokenNbcRow:
633			row := make([]interface{}, len(columns))
634			parseNbcRow(sess.buf, columns, row)
635			ch <- row
636		case tokenEnvChange:
637			processEnvChg(sess)
638		case tokenError:
639			err := parseError72(sess.buf)
640			if sess.logFlags&logDebug != 0 {
641				sess.log.Printf("got ERROR %d %s", err.Number, err.Message)
642			}
643			errs = append(errs, err)
644			if sess.logFlags&logErrors != 0 {
645				sess.log.Println(err.Message)
646			}
647		case tokenInfo:
648			info := parseInfo(sess.buf)
649			if sess.logFlags&logDebug != 0 {
650				sess.log.Printf("got INFO %d %s", info.Number, info.Message)
651			}
652			if sess.logFlags&logMessages != 0 {
653				sess.log.Println(info.Message)
654			}
655		case tokenReturnValue:
656			nv := parseReturnValue(sess.buf)
657			if len(nv.Name) > 0 {
658				name := nv.Name[1:] // Remove the leading "@".
659				if ov, has := outs[name]; has {
660					err = scanIntoOut(name, nv.Value, ov)
661					if err != nil {
662						fmt.Println("scan error", err)
663						ch <- err
664					}
665				}
666			}
667		default:
668			badStreamPanic(fmt.Errorf("unknown token type returned: %v", token))
669		}
670	}
671}
672
673type parseRespIter byte
674
675const (
676	parseRespIterContinue parseRespIter = iota // Continue parsing current token.
677	parseRespIterNext                          // Fetch the next token.
678	parseRespIterDone                          // Done with parsing the response.
679)
680
681type parseRespState byte
682
683const (
684	parseRespStateNormal  parseRespState = iota // Normal response state.
685	parseRespStateCancel                        // Query is canceled, wait for server to confirm.
686	parseRespStateClosing                       // Waiting for tokens to come through.
687)
688
689type parseResp struct {
690	sess        *tdsSession
691	ctxDone     <-chan struct{}
692	state       parseRespState
693	cancelError error
694}
695
696func (ts *parseResp) sendAttention(ch chan tokenStruct) parseRespIter {
697	if err := sendAttention(ts.sess.buf); err != nil {
698		ts.dlogf("failed to send attention signal %v", err)
699		ch <- err
700		return parseRespIterDone
701	}
702	ts.state = parseRespStateCancel
703	return parseRespIterContinue
704}
705
706func (ts *parseResp) dlog(msg string) {
707	if ts.sess.logFlags&logDebug != 0 {
708		ts.sess.log.Println(msg)
709	}
710}
711func (ts *parseResp) dlogf(f string, v ...interface{}) {
712	if ts.sess.logFlags&logDebug != 0 {
713		ts.sess.log.Printf(f, v...)
714	}
715}
716
717func (ts *parseResp) iter(ctx context.Context, ch chan tokenStruct, tokChan chan tokenStruct) parseRespIter {
718	switch ts.state {
719	default:
720		panic("unknown state")
721	case parseRespStateNormal:
722		select {
723		case tok, ok := <-tokChan:
724			if !ok {
725				ts.dlog("response finished")
726				return parseRespIterDone
727			}
728			if err, ok := tok.(net.Error); ok && err.Timeout() {
729				ts.cancelError = err
730				ts.dlog("got timeout error, sending attention signal to server")
731				return ts.sendAttention(ch)
732			}
733			// Pass the token along.
734			ch <- tok
735			return parseRespIterContinue
736
737		case <-ts.ctxDone:
738			ts.ctxDone = nil
739			ts.dlog("got cancel message, sending attention signal to server")
740			return ts.sendAttention(ch)
741		}
742	case parseRespStateCancel: // Read all responses until a DONE or error is received.Auth
743		select {
744		case tok, ok := <-tokChan:
745			if !ok {
746				ts.dlog("response finished but waiting for attention ack")
747				return parseRespIterNext
748			}
749			switch tok := tok.(type) {
750			default:
751				// Ignore all other tokens while waiting.
752				// The TDS spec says other tokens may arrive after an attention
753				// signal is sent. Ignore these tokens and continue looking for
754				// a DONE with attention confirm mark.
755			case doneStruct:
756				if tok.Status&doneAttn != 0 {
757					ts.dlog("got cancellation confirmation from server")
758					if ts.cancelError != nil {
759						ch <- ts.cancelError
760						ts.cancelError = nil
761					} else {
762						ch <- ctx.Err()
763					}
764					return parseRespIterDone
765				}
766
767			// If an error happens during cancel, pass it along and just stop.
768			// We are uncertain to receive more tokens.
769			case error:
770				ch <- tok
771				ts.state = parseRespStateClosing
772			}
773			return parseRespIterContinue
774		case <-ts.ctxDone:
775			ts.ctxDone = nil
776			ts.state = parseRespStateClosing
777			return parseRespIterContinue
778		}
779	case parseRespStateClosing: // Wait for current token chan to close.
780		if _, ok := <-tokChan; !ok {
781			ts.dlog("response finished")
782			return parseRespIterDone
783		}
784		return parseRespIterContinue
785	}
786}
787
788func processResponse(ctx context.Context, sess *tdsSession, ch chan tokenStruct, outs map[string]interface{}) {
789	ts := &parseResp{
790		sess:    sess,
791		ctxDone: ctx.Done(),
792	}
793	defer func() {
794		// Ensure any remaining error is piped through
795		// or the query may look like it executed when it actually failed.
796		if ts.cancelError != nil {
797			ch <- ts.cancelError
798			ts.cancelError = nil
799		}
800		close(ch)
801	}()
802
803	// Loop over multiple responses.
804	for {
805		ts.dlog("initiating response reading")
806
807		tokChan := make(chan tokenStruct)
808		go processSingleResponse(sess, tokChan, outs)
809
810		// Loop over multiple tokens in response.
811	tokensLoop:
812		for {
813			switch ts.iter(ctx, ch, tokChan) {
814			case parseRespIterContinue:
815				// Nothing, continue to next token.
816			case parseRespIterNext:
817				break tokensLoop
818			case parseRespIterDone:
819				return
820			}
821		}
822	}
823}
824