1package mssql 2 3import ( 4 "context" 5 "encoding/binary" 6 "errors" 7 "fmt" 8 "io" 9 "net" 10 "strconv" 11 "strings" 12) 13 14//go:generate stringer -type token 15 16type token byte 17 18// token ids 19const ( 20 tokenReturnStatus token = 121 // 0x79 21 tokenColMetadata token = 129 // 0x81 22 tokenOrder token = 169 // 0xA9 23 tokenError token = 170 // 0xAA 24 tokenInfo token = 171 // 0xAB 25 tokenReturnValue token = 0xAC 26 tokenLoginAck token = 173 // 0xad 27 tokenFeatureExtAck token = 174 // 0xae 28 tokenRow token = 209 // 0xd1 29 tokenNbcRow token = 210 // 0xd2 30 tokenEnvChange token = 227 // 0xE3 31 tokenSSPI token = 237 // 0xED 32 tokenDone token = 253 // 0xFD 33 tokenDoneProc token = 254 34 tokenDoneInProc token = 255 35) 36 37// done flags 38// https://msdn.microsoft.com/en-us/library/dd340421.aspx 39const ( 40 doneFinal = 0 41 doneMore = 1 42 doneError = 2 43 doneInxact = 4 44 doneCount = 0x10 45 doneAttn = 0x20 46 doneSrvError = 0x100 47) 48 49// ENVCHANGE types 50// http://msdn.microsoft.com/en-us/library/dd303449.aspx 51const ( 52 envTypDatabase = 1 53 envTypLanguage = 2 54 envTypCharset = 3 55 envTypPacketSize = 4 56 envSortId = 5 57 envSortFlags = 6 58 envSqlCollation = 7 59 envTypBeginTran = 8 60 envTypCommitTran = 9 61 envTypRollbackTran = 10 62 envEnlistDTC = 11 63 envDefectTran = 12 64 envDatabaseMirrorPartner = 13 65 envPromoteTran = 15 66 envTranMgrAddr = 16 67 envTranEnded = 17 68 envResetConnAck = 18 69 envStartedInstanceName = 19 70 envRouting = 20 71) 72 73// COLMETADATA flags 74// https://msdn.microsoft.com/en-us/library/dd357363.aspx 75const ( 76 colFlagNullable = 1 77 // TODO implement more flags 78) 79 80// interface for all tokens 81type tokenStruct interface{} 82 83type orderStruct struct { 84 ColIds []uint16 85} 86 87type doneStruct struct { 88 Status uint16 89 CurCmd uint16 90 RowCount uint64 91 errors []Error 92} 93 94func (d doneStruct) isError() bool { 95 return d.Status&doneError != 0 || len(d.errors) > 0 96} 97 98func (d doneStruct) getError() Error { 99 if len(d.errors) > 0 { 100 return d.errors[len(d.errors)-1] 101 } else { 102 return Error{Message: "Request failed but didn't provide reason"} 103 } 104} 105 106type doneInProcStruct doneStruct 107 108var doneFlags2str = map[uint16]string{ 109 doneFinal: "final", 110 doneMore: "more", 111 doneError: "error", 112 doneInxact: "inxact", 113 doneCount: "count", 114 doneAttn: "attn", 115 doneSrvError: "srverror", 116} 117 118func doneFlags2Str(flags uint16) string { 119 strs := make([]string, 0, len(doneFlags2str)) 120 for flag, tag := range doneFlags2str { 121 if flags&flag != 0 { 122 strs = append(strs, tag) 123 } 124 } 125 return strings.Join(strs, "|") 126} 127 128// ENVCHANGE stream 129// http://msdn.microsoft.com/en-us/library/dd303449.aspx 130func processEnvChg(sess *tdsSession) { 131 size := sess.buf.uint16() 132 r := &io.LimitedReader{R: sess.buf, N: int64(size)} 133 for { 134 var err error 135 var envtype uint8 136 err = binary.Read(r, binary.LittleEndian, &envtype) 137 if err == io.EOF { 138 return 139 } 140 if err != nil { 141 badStreamPanic(err) 142 } 143 switch envtype { 144 case envTypDatabase: 145 sess.database, err = readBVarChar(r) 146 if err != nil { 147 badStreamPanic(err) 148 } 149 _, err = readBVarChar(r) 150 if err != nil { 151 badStreamPanic(err) 152 } 153 case envTypLanguage: 154 // currently ignored 155 // new value 156 if _, err = readBVarChar(r); err != nil { 157 badStreamPanic(err) 158 } 159 // old value 160 if _, err = readBVarChar(r); err != nil { 161 badStreamPanic(err) 162 } 163 case envTypCharset: 164 // currently ignored 165 // new value 166 if _, err = readBVarChar(r); err != nil { 167 badStreamPanic(err) 168 } 169 // old value 170 if _, err = readBVarChar(r); err != nil { 171 badStreamPanic(err) 172 } 173 case envTypPacketSize: 174 packetsize, err := readBVarChar(r) 175 if err != nil { 176 badStreamPanic(err) 177 } 178 _, err = readBVarChar(r) 179 if err != nil { 180 badStreamPanic(err) 181 } 182 packetsizei, err := strconv.Atoi(packetsize) 183 if err != nil { 184 badStreamPanicf("Invalid Packet size value returned from server (%s): %s", packetsize, err.Error()) 185 } 186 sess.buf.ResizeBuffer(packetsizei) 187 case envSortId: 188 // currently ignored 189 // new value 190 if _, err = readBVarChar(r); err != nil { 191 badStreamPanic(err) 192 } 193 // old value, should be 0 194 if _, err = readBVarChar(r); err != nil { 195 badStreamPanic(err) 196 } 197 case envSortFlags: 198 // currently ignored 199 // new value 200 if _, err = readBVarChar(r); err != nil { 201 badStreamPanic(err) 202 } 203 // old value, should be 0 204 if _, err = readBVarChar(r); err != nil { 205 badStreamPanic(err) 206 } 207 case envSqlCollation: 208 // currently ignored 209 var collationSize uint8 210 err = binary.Read(r, binary.LittleEndian, &collationSize) 211 if err != nil { 212 badStreamPanic(err) 213 } 214 215 // SQL Collation data should contain 5 bytes in length 216 if collationSize != 5 { 217 badStreamPanicf("Invalid SQL Collation size value returned from server: %d", collationSize) 218 } 219 220 // 4 bytes, contains: LCID ColFlags Version 221 var info uint32 222 err = binary.Read(r, binary.LittleEndian, &info) 223 if err != nil { 224 badStreamPanic(err) 225 } 226 227 // 1 byte, contains: sortID 228 var sortID uint8 229 err = binary.Read(r, binary.LittleEndian, &sortID) 230 if err != nil { 231 badStreamPanic(err) 232 } 233 234 // old value, should be 0 235 if _, err = readBVarChar(r); err != nil { 236 badStreamPanic(err) 237 } 238 case envTypBeginTran: 239 tranid, err := readBVarByte(r) 240 if len(tranid) != 8 { 241 badStreamPanicf("invalid size of transaction identifier: %d", len(tranid)) 242 } 243 sess.tranid = binary.LittleEndian.Uint64(tranid) 244 if err != nil { 245 badStreamPanic(err) 246 } 247 if sess.logFlags&logTransaction != 0 { 248 sess.log.Printf("BEGIN TRANSACTION %x\n", sess.tranid) 249 } 250 _, err = readBVarByte(r) 251 if err != nil { 252 badStreamPanic(err) 253 } 254 case envTypCommitTran, envTypRollbackTran: 255 _, err = readBVarByte(r) 256 if err != nil { 257 badStreamPanic(err) 258 } 259 _, err = readBVarByte(r) 260 if err != nil { 261 badStreamPanic(err) 262 } 263 if sess.logFlags&logTransaction != 0 { 264 if envtype == envTypCommitTran { 265 sess.log.Printf("COMMIT TRANSACTION %x\n", sess.tranid) 266 } else { 267 sess.log.Printf("ROLLBACK TRANSACTION %x\n", sess.tranid) 268 } 269 } 270 sess.tranid = 0 271 case envEnlistDTC: 272 // currently ignored 273 // new value, should be 0 274 if _, err = readBVarChar(r); err != nil { 275 badStreamPanic(err) 276 } 277 // old value 278 if _, err = readBVarChar(r); err != nil { 279 badStreamPanic(err) 280 } 281 case envDefectTran: 282 // currently ignored 283 // new value 284 if _, err = readBVarChar(r); err != nil { 285 badStreamPanic(err) 286 } 287 // old value, should be 0 288 if _, err = readBVarChar(r); err != nil { 289 badStreamPanic(err) 290 } 291 case envDatabaseMirrorPartner: 292 sess.partner, err = readBVarChar(r) 293 if err != nil { 294 badStreamPanic(err) 295 } 296 _, err = readBVarChar(r) 297 if err != nil { 298 badStreamPanic(err) 299 } 300 case envPromoteTran: 301 // currently ignored 302 // old value, should be 0 303 if _, err = readBVarChar(r); err != nil { 304 badStreamPanic(err) 305 } 306 // dtc token 307 // spec says it should be L_VARBYTE, so this code might be wrong 308 if _, err = readBVarChar(r); err != nil { 309 badStreamPanic(err) 310 } 311 case envTranMgrAddr: 312 // currently ignored 313 // old value, should be 0 314 if _, err = readBVarChar(r); err != nil { 315 badStreamPanic(err) 316 } 317 // XACT_MANAGER_ADDRESS = B_VARBYTE 318 if _, err = readBVarChar(r); err != nil { 319 badStreamPanic(err) 320 } 321 case envTranEnded: 322 // currently ignored 323 // old value, B_VARBYTE 324 if _, err = readBVarChar(r); err != nil { 325 badStreamPanic(err) 326 } 327 // should be 0 328 if _, err = readBVarChar(r); err != nil { 329 badStreamPanic(err) 330 } 331 case envResetConnAck: 332 // currently ignored 333 // old value, should be 0 334 if _, err = readBVarChar(r); err != nil { 335 badStreamPanic(err) 336 } 337 // should be 0 338 if _, err = readBVarChar(r); err != nil { 339 badStreamPanic(err) 340 } 341 case envStartedInstanceName: 342 // currently ignored 343 // old value, should be 0 344 if _, err = readBVarChar(r); err != nil { 345 badStreamPanic(err) 346 } 347 // instance name 348 if _, err = readBVarChar(r); err != nil { 349 badStreamPanic(err) 350 } 351 case envRouting: 352 // RoutingData message is: 353 // ValueLength USHORT 354 // Protocol (TCP = 0) BYTE 355 // ProtocolProperty (new port) USHORT 356 // AlternateServer US_VARCHAR 357 _, err := readUshort(r) 358 if err != nil { 359 badStreamPanic(err) 360 } 361 protocol, err := readByte(r) 362 if err != nil || protocol != 0 { 363 badStreamPanic(err) 364 } 365 newPort, err := readUshort(r) 366 if err != nil { 367 badStreamPanic(err) 368 } 369 newServer, err := readUsVarChar(r) 370 if err != nil { 371 badStreamPanic(err) 372 } 373 // consume the OLDVALUE = %x00 %x00 374 _, err = readUshort(r) 375 if err != nil { 376 badStreamPanic(err) 377 } 378 sess.routedServer = newServer 379 sess.routedPort = newPort 380 default: 381 // ignore rest of records because we don't know how to skip those 382 sess.log.Printf("WARN: Unknown ENVCHANGE record detected with type id = %d\n", envtype) 383 break 384 } 385 386 } 387} 388 389// http://msdn.microsoft.com/en-us/library/dd358180.aspx 390func parseReturnStatus(r *tdsBuffer) ReturnStatus { 391 return ReturnStatus(r.int32()) 392} 393 394func parseOrder(r *tdsBuffer) (res orderStruct) { 395 len := int(r.uint16()) 396 res.ColIds = make([]uint16, len/2) 397 for i := 0; i < len/2; i++ { 398 res.ColIds[i] = r.uint16() 399 } 400 return res 401} 402 403// https://msdn.microsoft.com/en-us/library/dd340421.aspx 404func parseDone(r *tdsBuffer) (res doneStruct) { 405 res.Status = r.uint16() 406 res.CurCmd = r.uint16() 407 res.RowCount = r.uint64() 408 return res 409} 410 411// https://msdn.microsoft.com/en-us/library/dd340553.aspx 412func parseDoneInProc(r *tdsBuffer) (res doneInProcStruct) { 413 res.Status = r.uint16() 414 res.CurCmd = r.uint16() 415 res.RowCount = r.uint64() 416 return res 417} 418 419type sspiMsg []byte 420 421func parseSSPIMsg(r *tdsBuffer) sspiMsg { 422 size := r.uint16() 423 buf := make([]byte, size) 424 r.ReadFull(buf) 425 return sspiMsg(buf) 426} 427 428type loginAckStruct struct { 429 Interface uint8 430 TDSVersion uint32 431 ProgName string 432 ProgVer uint32 433} 434 435func parseLoginAck(r *tdsBuffer) loginAckStruct { 436 size := r.uint16() 437 buf := make([]byte, size) 438 r.ReadFull(buf) 439 var res loginAckStruct 440 res.Interface = buf[0] 441 res.TDSVersion = binary.BigEndian.Uint32(buf[1:]) 442 prognamelen := buf[1+4] 443 var err error 444 if res.ProgName, err = ucs22str(buf[1+4+1 : 1+4+1+prognamelen*2]); err != nil { 445 badStreamPanic(err) 446 } 447 res.ProgVer = binary.BigEndian.Uint32(buf[size-4:]) 448 return res 449} 450 451// https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/2eb82f8e-11f0-46dc-b42d-27302fa4701a 452func parseFeatureExtAck(r *tdsBuffer) { 453 // at most 1 featureAck per feature in featureExt 454 // go-mssqldb will add at most 1 feature, the spec defines 7 different features 455 for i := 0; i < 8; i++ { 456 featureID := r.byte() // FeatureID 457 if featureID == 0xff { 458 return 459 } 460 size := r.uint32() // FeatureAckDataLen 461 d := make([]byte, size) 462 r.ReadFull(d) 463 } 464 panic("parsed more than 7 featureAck's, protocol implementation error?") 465} 466 467// http://msdn.microsoft.com/en-us/library/dd357363.aspx 468func parseColMetadata72(r *tdsBuffer) (columns []columnStruct) { 469 count := r.uint16() 470 if count == 0xffff { 471 // no metadata is sent 472 return nil 473 } 474 columns = make([]columnStruct, count) 475 for i := range columns { 476 column := &columns[i] 477 column.UserType = r.uint32() 478 column.Flags = r.uint16() 479 480 // parsing TYPE_INFO structure 481 column.ti = readTypeInfo(r) 482 column.ColName = r.BVarChar() 483 } 484 return columns 485} 486 487// http://msdn.microsoft.com/en-us/library/dd357254.aspx 488func parseRow(r *tdsBuffer, columns []columnStruct, row []interface{}) { 489 for i, column := range columns { 490 row[i] = column.ti.Reader(&column.ti, r) 491 } 492} 493 494// http://msdn.microsoft.com/en-us/library/dd304783.aspx 495func parseNbcRow(r *tdsBuffer, columns []columnStruct, row []interface{}) { 496 bitlen := (len(columns) + 7) / 8 497 pres := make([]byte, bitlen) 498 r.ReadFull(pres) 499 for i, col := range columns { 500 if pres[i/8]&(1<<(uint(i)%8)) != 0 { 501 row[i] = nil 502 continue 503 } 504 row[i] = col.ti.Reader(&col.ti, r) 505 } 506} 507 508// http://msdn.microsoft.com/en-us/library/dd304156.aspx 509func parseError72(r *tdsBuffer) (res Error) { 510 length := r.uint16() 511 _ = length // ignore length 512 res.Number = r.int32() 513 res.State = r.byte() 514 res.Class = r.byte() 515 res.Message = r.UsVarChar() 516 res.ServerName = r.BVarChar() 517 res.ProcName = r.BVarChar() 518 res.LineNo = r.int32() 519 return 520} 521 522// http://msdn.microsoft.com/en-us/library/dd304156.aspx 523func parseInfo(r *tdsBuffer) (res Error) { 524 length := r.uint16() 525 _ = length // ignore length 526 res.Number = r.int32() 527 res.State = r.byte() 528 res.Class = r.byte() 529 res.Message = r.UsVarChar() 530 res.ServerName = r.BVarChar() 531 res.ProcName = r.BVarChar() 532 res.LineNo = r.int32() 533 return 534} 535 536// https://msdn.microsoft.com/en-us/library/dd303881.aspx 537func parseReturnValue(r *tdsBuffer) (nv namedValue) { 538 /* 539 ParamOrdinal 540 ParamName 541 Status 542 UserType 543 Flags 544 TypeInfo 545 CryptoMetadata 546 Value 547 */ 548 r.uint16() 549 nv.Name = r.BVarChar() 550 r.byte() 551 r.uint32() // UserType (uint16 prior to 7.2) 552 r.uint16() 553 ti := readTypeInfo(r) 554 nv.Value = ti.Reader(&ti, r) 555 return 556} 557 558func processSingleResponse(sess *tdsSession, ch chan tokenStruct, outs map[string]interface{}) { 559 defer func() { 560 if err := recover(); err != nil { 561 if sess.logFlags&logErrors != 0 { 562 sess.log.Printf("ERROR: Intercepted panic %v", err) 563 } 564 ch <- err 565 } 566 close(ch) 567 }() 568 569 packet_type, err := sess.buf.BeginRead() 570 if err != nil { 571 if sess.logFlags&logErrors != 0 { 572 sess.log.Printf("ERROR: BeginRead failed %v", err) 573 } 574 ch <- err 575 return 576 } 577 if packet_type != packReply { 578 badStreamPanic(fmt.Errorf("unexpected packet type in reply: got %v, expected %v", packet_type, packReply)) 579 } 580 var columns []columnStruct 581 errs := make([]Error, 0, 5) 582 for { 583 token := token(sess.buf.byte()) 584 if sess.logFlags&logDebug != 0 { 585 sess.log.Printf("got token %v", token) 586 } 587 switch token { 588 case tokenSSPI: 589 ch <- parseSSPIMsg(sess.buf) 590 return 591 case tokenReturnStatus: 592 returnStatus := parseReturnStatus(sess.buf) 593 ch <- returnStatus 594 case tokenLoginAck: 595 loginAck := parseLoginAck(sess.buf) 596 ch <- loginAck 597 case tokenFeatureExtAck: 598 parseFeatureExtAck(sess.buf) 599 case tokenOrder: 600 order := parseOrder(sess.buf) 601 ch <- order 602 case tokenDoneInProc: 603 done := parseDoneInProc(sess.buf) 604 if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 { 605 sess.log.Printf("(%d row(s) affected)\n", done.RowCount) 606 } 607 ch <- done 608 case tokenDone, tokenDoneProc: 609 done := parseDone(sess.buf) 610 done.errors = errs 611 if sess.logFlags&logDebug != 0 { 612 sess.log.Printf("got DONE or DONEPROC status=%d", done.Status) 613 } 614 if done.Status&doneSrvError != 0 { 615 ch <- errors.New("SQL Server had internal error") 616 return 617 } 618 if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 { 619 sess.log.Printf("(%d row(s) affected)\n", done.RowCount) 620 } 621 ch <- done 622 if done.Status&doneMore == 0 { 623 return 624 } 625 case tokenColMetadata: 626 columns = parseColMetadata72(sess.buf) 627 ch <- columns 628 case tokenRow: 629 row := make([]interface{}, len(columns)) 630 parseRow(sess.buf, columns, row) 631 ch <- row 632 case tokenNbcRow: 633 row := make([]interface{}, len(columns)) 634 parseNbcRow(sess.buf, columns, row) 635 ch <- row 636 case tokenEnvChange: 637 processEnvChg(sess) 638 case tokenError: 639 err := parseError72(sess.buf) 640 if sess.logFlags&logDebug != 0 { 641 sess.log.Printf("got ERROR %d %s", err.Number, err.Message) 642 } 643 errs = append(errs, err) 644 if sess.logFlags&logErrors != 0 { 645 sess.log.Println(err.Message) 646 } 647 case tokenInfo: 648 info := parseInfo(sess.buf) 649 if sess.logFlags&logDebug != 0 { 650 sess.log.Printf("got INFO %d %s", info.Number, info.Message) 651 } 652 if sess.logFlags&logMessages != 0 { 653 sess.log.Println(info.Message) 654 } 655 case tokenReturnValue: 656 nv := parseReturnValue(sess.buf) 657 if len(nv.Name) > 0 { 658 name := nv.Name[1:] // Remove the leading "@". 659 if ov, has := outs[name]; has { 660 err = scanIntoOut(name, nv.Value, ov) 661 if err != nil { 662 fmt.Println("scan error", err) 663 ch <- err 664 } 665 } 666 } 667 default: 668 badStreamPanic(fmt.Errorf("unknown token type returned: %v", token)) 669 } 670 } 671} 672 673type parseRespIter byte 674 675const ( 676 parseRespIterContinue parseRespIter = iota // Continue parsing current token. 677 parseRespIterNext // Fetch the next token. 678 parseRespIterDone // Done with parsing the response. 679) 680 681type parseRespState byte 682 683const ( 684 parseRespStateNormal parseRespState = iota // Normal response state. 685 parseRespStateCancel // Query is canceled, wait for server to confirm. 686 parseRespStateClosing // Waiting for tokens to come through. 687) 688 689type parseResp struct { 690 sess *tdsSession 691 ctxDone <-chan struct{} 692 state parseRespState 693 cancelError error 694} 695 696func (ts *parseResp) sendAttention(ch chan tokenStruct) parseRespIter { 697 if err := sendAttention(ts.sess.buf); err != nil { 698 ts.dlogf("failed to send attention signal %v", err) 699 ch <- err 700 return parseRespIterDone 701 } 702 ts.state = parseRespStateCancel 703 return parseRespIterContinue 704} 705 706func (ts *parseResp) dlog(msg string) { 707 if ts.sess.logFlags&logDebug != 0 { 708 ts.sess.log.Println(msg) 709 } 710} 711func (ts *parseResp) dlogf(f string, v ...interface{}) { 712 if ts.sess.logFlags&logDebug != 0 { 713 ts.sess.log.Printf(f, v...) 714 } 715} 716 717func (ts *parseResp) iter(ctx context.Context, ch chan tokenStruct, tokChan chan tokenStruct) parseRespIter { 718 switch ts.state { 719 default: 720 panic("unknown state") 721 case parseRespStateNormal: 722 select { 723 case tok, ok := <-tokChan: 724 if !ok { 725 ts.dlog("response finished") 726 return parseRespIterDone 727 } 728 if err, ok := tok.(net.Error); ok && err.Timeout() { 729 ts.cancelError = err 730 ts.dlog("got timeout error, sending attention signal to server") 731 return ts.sendAttention(ch) 732 } 733 // Pass the token along. 734 ch <- tok 735 return parseRespIterContinue 736 737 case <-ts.ctxDone: 738 ts.ctxDone = nil 739 ts.dlog("got cancel message, sending attention signal to server") 740 return ts.sendAttention(ch) 741 } 742 case parseRespStateCancel: // Read all responses until a DONE or error is received.Auth 743 select { 744 case tok, ok := <-tokChan: 745 if !ok { 746 ts.dlog("response finished but waiting for attention ack") 747 return parseRespIterNext 748 } 749 switch tok := tok.(type) { 750 default: 751 // Ignore all other tokens while waiting. 752 // The TDS spec says other tokens may arrive after an attention 753 // signal is sent. Ignore these tokens and continue looking for 754 // a DONE with attention confirm mark. 755 case doneStruct: 756 if tok.Status&doneAttn != 0 { 757 ts.dlog("got cancellation confirmation from server") 758 if ts.cancelError != nil { 759 ch <- ts.cancelError 760 ts.cancelError = nil 761 } else { 762 ch <- ctx.Err() 763 } 764 return parseRespIterDone 765 } 766 767 // If an error happens during cancel, pass it along and just stop. 768 // We are uncertain to receive more tokens. 769 case error: 770 ch <- tok 771 ts.state = parseRespStateClosing 772 } 773 return parseRespIterContinue 774 case <-ts.ctxDone: 775 ts.ctxDone = nil 776 ts.state = parseRespStateClosing 777 return parseRespIterContinue 778 } 779 case parseRespStateClosing: // Wait for current token chan to close. 780 if _, ok := <-tokChan; !ok { 781 ts.dlog("response finished") 782 return parseRespIterDone 783 } 784 return parseRespIterContinue 785 } 786} 787 788func processResponse(ctx context.Context, sess *tdsSession, ch chan tokenStruct, outs map[string]interface{}) { 789 ts := &parseResp{ 790 sess: sess, 791 ctxDone: ctx.Done(), 792 } 793 defer func() { 794 // Ensure any remaining error is piped through 795 // or the query may look like it executed when it actually failed. 796 if ts.cancelError != nil { 797 ch <- ts.cancelError 798 ts.cancelError = nil 799 } 800 close(ch) 801 }() 802 803 // Loop over multiple responses. 804 for { 805 ts.dlog("initiating response reading") 806 807 tokChan := make(chan tokenStruct) 808 go processSingleResponse(sess, tokChan, outs) 809 810 // Loop over multiple tokens in response. 811 tokensLoop: 812 for { 813 switch ts.iter(ctx, ch, tokChan) { 814 case parseRespIterContinue: 815 // Nothing, continue to next token. 816 case parseRespIterNext: 817 break tokensLoop 818 case parseRespIterDone: 819 return 820 } 821 } 822 } 823} 824