1// Copyright (c) 2017-2021 Snowflake Computing Inc. All right reserved.
2
3package gosnowflake
4
5import (
6	"database/sql/driver"
7	"io"
8	"reflect"
9	"strings"
10)
11
12const (
13	headerSseCAlgorithm = "x-amz-server-side-encryption-customer-algorithm"
14	headerSseCKey       = "x-amz-server-side-encryption-customer-key"
15	headerSseCAes       = "AES256"
16)
17
18var (
19	// MaxChunkDownloadWorkers specifies the maximum number of goroutines used to download chunks
20	MaxChunkDownloadWorkers = 10
21
22	// CustomJSONDecoderEnabled has the chunk downloader use the custom JSON decoder to reduce memory footprint.
23	CustomJSONDecoderEnabled = false
24)
25
26var (
27	maxChunkDownloaderErrorCounter = 5
28)
29
30type snowflakeRows struct {
31	sc                  *snowflakeConn
32	ChunkDownloader     chunkDownloader
33	tailChunkDownloader chunkDownloader
34	queryID             string
35	status              queryStatus
36	err                 error
37	errChannel          chan error
38}
39
40type snowflakeValue interface{}
41
42type chunkRowType struct {
43	RowSet   []*string
44	ArrowRow []snowflakeValue
45}
46
47type rowSetType struct {
48	RowType      []execResponseRowType
49	JSON         [][]*string
50	RowSetBase64 string
51}
52
53type chunkError struct {
54	Index int
55	Error error
56}
57
58func (rows *snowflakeRows) Close() (err error) {
59	if err := rows.waitForAsyncQueryStatus(); err != nil {
60		return err
61	}
62	logger.WithContext(rows.sc.ctx).Debugln("Rows.Close")
63	return nil
64}
65
66// ColumnTypeDatabaseTypeName returns the database column name.
67func (rows *snowflakeRows) ColumnTypeDatabaseTypeName(index int) string {
68	if err := rows.waitForAsyncQueryStatus(); err != nil {
69		return err.Error()
70	}
71	return strings.ToUpper(rows.ChunkDownloader.getRowType()[index].Type)
72}
73
74// ColumnTypeLength returns the length of the column
75func (rows *snowflakeRows) ColumnTypeLength(index int) (length int64, ok bool) {
76	if err := rows.waitForAsyncQueryStatus(); err != nil {
77		return 0, false
78	}
79	if index < 0 || index > len(rows.ChunkDownloader.getRowType()) {
80		return 0, false
81	}
82	switch rows.ChunkDownloader.getRowType()[index].Type {
83	case "text", "variant", "object", "array", "binary":
84		return rows.ChunkDownloader.getRowType()[index].Length, true
85	}
86	return 0, false
87}
88
89func (rows *snowflakeRows) ColumnTypeNullable(index int) (nullable, ok bool) {
90	if err := rows.waitForAsyncQueryStatus(); err != nil {
91		return false, false
92	}
93	if index < 0 || index > len(rows.ChunkDownloader.getRowType()) {
94		return false, false
95	}
96	return rows.ChunkDownloader.getRowType()[index].Nullable, true
97}
98
99func (rows *snowflakeRows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) {
100	if err := rows.waitForAsyncQueryStatus(); err != nil {
101		return 0, 0, false
102	}
103	rowType := rows.ChunkDownloader.getRowType()
104	if index < 0 || index > len(rowType) {
105		return 0, 0, false
106	}
107	switch rowType[index].Type {
108	case "fixed":
109		return rowType[index].Precision, rowType[index].Scale, true
110	case "time":
111		return rowType[index].Scale, 0, true
112	case "timestamp":
113		return rowType[index].Scale, 0, true
114	}
115	return 0, 0, false
116}
117
118func (rows *snowflakeRows) Columns() []string {
119	if err := rows.waitForAsyncQueryStatus(); err != nil {
120		return make([]string, 0)
121	}
122	logger.Debug("Rows.Columns")
123	ret := make([]string, len(rows.ChunkDownloader.getRowType()))
124	for i, n := 0, len(rows.ChunkDownloader.getRowType()); i < n; i++ {
125		ret[i] = rows.ChunkDownloader.getRowType()[i].Name
126	}
127	return ret
128}
129
130func (rows *snowflakeRows) ColumnTypeScanType(index int) reflect.Type {
131	if err := rows.waitForAsyncQueryStatus(); err != nil {
132		return nil
133	}
134	return snowflakeTypeToGo(
135		getSnowflakeType(strings.ToUpper(rows.ChunkDownloader.getRowType()[index].Type)),
136		rows.ChunkDownloader.getRowType()[index].Scale)
137}
138
139func (rows *snowflakeRows) GetQueryID() string {
140	return rows.queryID
141}
142
143func (rows *snowflakeRows) GetStatus() queryStatus {
144	return rows.status
145}
146
147func (rows *snowflakeRows) Next(dest []driver.Value) (err error) {
148	if err := rows.waitForAsyncQueryStatus(); err != nil {
149		return err
150	}
151	row, err := rows.ChunkDownloader.next()
152	if err != nil {
153		// includes io.EOF
154		if err == io.EOF {
155			rows.ChunkDownloader.reset()
156		}
157		return err
158	}
159
160	if rows.ChunkDownloader.getQueryResultFormat() == arrowFormat {
161		for i, n := 0, len(row.ArrowRow); i < n; i++ {
162			dest[i] = row.ArrowRow[i]
163		}
164	} else {
165		for i, n := 0, len(row.RowSet); i < n; i++ {
166			// could move to chunk downloader so that each go routine
167			// can convert data
168			err := stringToValue(&dest[i], rows.ChunkDownloader.getRowType()[i], row.RowSet[i])
169			if err != nil {
170				return err
171			}
172		}
173	}
174	return err
175}
176
177func (rows *snowflakeRows) HasNextResultSet() bool {
178	if err := rows.waitForAsyncQueryStatus(); err != nil {
179		return false
180	}
181	return rows.ChunkDownloader.hasNextResultSet()
182}
183
184func (rows *snowflakeRows) NextResultSet() error {
185	if err := rows.waitForAsyncQueryStatus(); err != nil {
186		return err
187	}
188	if len(rows.ChunkDownloader.getChunkMetas()) == 0 {
189		if rows.ChunkDownloader.getNextChunkDownloader() == nil {
190			return io.EOF
191		}
192		rows.ChunkDownloader = rows.ChunkDownloader.getNextChunkDownloader()
193		rows.ChunkDownloader.start()
194	}
195	return rows.ChunkDownloader.nextResultSet()
196}
197
198func (rows *snowflakeRows) waitForAsyncQueryStatus() error {
199	// if async query, block until query is finished
200	if rows.status == QueryStatusInProgress {
201		err := <-rows.errChannel
202		rows.status = QueryStatusComplete
203		if err != nil {
204			rows.status = QueryFailed
205			rows.err = err
206			return rows.err
207		}
208	} else if rows.status == QueryFailed {
209		return rows.err
210	}
211	return nil
212}
213
214func (rows *snowflakeRows) addDownloader(newDL chunkDownloader) {
215	if rows.ChunkDownloader == nil {
216		rows.ChunkDownloader = newDL
217		rows.tailChunkDownloader = newDL
218		return
219	}
220	rows.tailChunkDownloader.setNextChunkDownloader(newDL)
221	rows.tailChunkDownloader = newDL
222}
223