1// Copyright (c) 2017-2021 Snowflake Computing Inc. All right reserved. 2 3package gosnowflake 4 5import ( 6 "database/sql/driver" 7 "io" 8 "reflect" 9 "strings" 10) 11 12const ( 13 headerSseCAlgorithm = "x-amz-server-side-encryption-customer-algorithm" 14 headerSseCKey = "x-amz-server-side-encryption-customer-key" 15 headerSseCAes = "AES256" 16) 17 18var ( 19 // MaxChunkDownloadWorkers specifies the maximum number of goroutines used to download chunks 20 MaxChunkDownloadWorkers = 10 21 22 // CustomJSONDecoderEnabled has the chunk downloader use the custom JSON decoder to reduce memory footprint. 23 CustomJSONDecoderEnabled = false 24) 25 26var ( 27 maxChunkDownloaderErrorCounter = 5 28) 29 30type snowflakeRows struct { 31 sc *snowflakeConn 32 ChunkDownloader chunkDownloader 33 tailChunkDownloader chunkDownloader 34 queryID string 35 status queryStatus 36 err error 37 errChannel chan error 38} 39 40type snowflakeValue interface{} 41 42type chunkRowType struct { 43 RowSet []*string 44 ArrowRow []snowflakeValue 45} 46 47type rowSetType struct { 48 RowType []execResponseRowType 49 JSON [][]*string 50 RowSetBase64 string 51} 52 53type chunkError struct { 54 Index int 55 Error error 56} 57 58func (rows *snowflakeRows) Close() (err error) { 59 if err := rows.waitForAsyncQueryStatus(); err != nil { 60 return err 61 } 62 logger.WithContext(rows.sc.ctx).Debugln("Rows.Close") 63 return nil 64} 65 66// ColumnTypeDatabaseTypeName returns the database column name. 67func (rows *snowflakeRows) ColumnTypeDatabaseTypeName(index int) string { 68 if err := rows.waitForAsyncQueryStatus(); err != nil { 69 return err.Error() 70 } 71 return strings.ToUpper(rows.ChunkDownloader.getRowType()[index].Type) 72} 73 74// ColumnTypeLength returns the length of the column 75func (rows *snowflakeRows) ColumnTypeLength(index int) (length int64, ok bool) { 76 if err := rows.waitForAsyncQueryStatus(); err != nil { 77 return 0, false 78 } 79 if index < 0 || index > len(rows.ChunkDownloader.getRowType()) { 80 return 0, false 81 } 82 switch rows.ChunkDownloader.getRowType()[index].Type { 83 case "text", "variant", "object", "array", "binary": 84 return rows.ChunkDownloader.getRowType()[index].Length, true 85 } 86 return 0, false 87} 88 89func (rows *snowflakeRows) ColumnTypeNullable(index int) (nullable, ok bool) { 90 if err := rows.waitForAsyncQueryStatus(); err != nil { 91 return false, false 92 } 93 if index < 0 || index > len(rows.ChunkDownloader.getRowType()) { 94 return false, false 95 } 96 return rows.ChunkDownloader.getRowType()[index].Nullable, true 97} 98 99func (rows *snowflakeRows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) { 100 if err := rows.waitForAsyncQueryStatus(); err != nil { 101 return 0, 0, false 102 } 103 rowType := rows.ChunkDownloader.getRowType() 104 if index < 0 || index > len(rowType) { 105 return 0, 0, false 106 } 107 switch rowType[index].Type { 108 case "fixed": 109 return rowType[index].Precision, rowType[index].Scale, true 110 case "time": 111 return rowType[index].Scale, 0, true 112 case "timestamp": 113 return rowType[index].Scale, 0, true 114 } 115 return 0, 0, false 116} 117 118func (rows *snowflakeRows) Columns() []string { 119 if err := rows.waitForAsyncQueryStatus(); err != nil { 120 return make([]string, 0) 121 } 122 logger.Debug("Rows.Columns") 123 ret := make([]string, len(rows.ChunkDownloader.getRowType())) 124 for i, n := 0, len(rows.ChunkDownloader.getRowType()); i < n; i++ { 125 ret[i] = rows.ChunkDownloader.getRowType()[i].Name 126 } 127 return ret 128} 129 130func (rows *snowflakeRows) ColumnTypeScanType(index int) reflect.Type { 131 if err := rows.waitForAsyncQueryStatus(); err != nil { 132 return nil 133 } 134 return snowflakeTypeToGo( 135 getSnowflakeType(strings.ToUpper(rows.ChunkDownloader.getRowType()[index].Type)), 136 rows.ChunkDownloader.getRowType()[index].Scale) 137} 138 139func (rows *snowflakeRows) GetQueryID() string { 140 return rows.queryID 141} 142 143func (rows *snowflakeRows) GetStatus() queryStatus { 144 return rows.status 145} 146 147func (rows *snowflakeRows) Next(dest []driver.Value) (err error) { 148 if err := rows.waitForAsyncQueryStatus(); err != nil { 149 return err 150 } 151 row, err := rows.ChunkDownloader.next() 152 if err != nil { 153 // includes io.EOF 154 if err == io.EOF { 155 rows.ChunkDownloader.reset() 156 } 157 return err 158 } 159 160 if rows.ChunkDownloader.getQueryResultFormat() == arrowFormat { 161 for i, n := 0, len(row.ArrowRow); i < n; i++ { 162 dest[i] = row.ArrowRow[i] 163 } 164 } else { 165 for i, n := 0, len(row.RowSet); i < n; i++ { 166 // could move to chunk downloader so that each go routine 167 // can convert data 168 err := stringToValue(&dest[i], rows.ChunkDownloader.getRowType()[i], row.RowSet[i]) 169 if err != nil { 170 return err 171 } 172 } 173 } 174 return err 175} 176 177func (rows *snowflakeRows) HasNextResultSet() bool { 178 if err := rows.waitForAsyncQueryStatus(); err != nil { 179 return false 180 } 181 return rows.ChunkDownloader.hasNextResultSet() 182} 183 184func (rows *snowflakeRows) NextResultSet() error { 185 if err := rows.waitForAsyncQueryStatus(); err != nil { 186 return err 187 } 188 if len(rows.ChunkDownloader.getChunkMetas()) == 0 { 189 if rows.ChunkDownloader.getNextChunkDownloader() == nil { 190 return io.EOF 191 } 192 rows.ChunkDownloader = rows.ChunkDownloader.getNextChunkDownloader() 193 rows.ChunkDownloader.start() 194 } 195 return rows.ChunkDownloader.nextResultSet() 196} 197 198func (rows *snowflakeRows) waitForAsyncQueryStatus() error { 199 // if async query, block until query is finished 200 if rows.status == QueryStatusInProgress { 201 err := <-rows.errChannel 202 rows.status = QueryStatusComplete 203 if err != nil { 204 rows.status = QueryFailed 205 rows.err = err 206 return rows.err 207 } 208 } else if rows.status == QueryFailed { 209 return rows.err 210 } 211 return nil 212} 213 214func (rows *snowflakeRows) addDownloader(newDL chunkDownloader) { 215 if rows.ChunkDownloader == nil { 216 rows.ChunkDownloader = newDL 217 rows.tailChunkDownloader = newDL 218 return 219 } 220 rows.tailChunkDownloader.setNextChunkDownloader(newDL) 221 rows.tailChunkDownloader = newDL 222} 223