1// Copyright 2020-2021 InfluxData, Inc. All rights reserved. 2// Use of this source code is governed by MIT 3// license that can be found in the LICENSE file. 4 5package api 6 7import ( 8 "bytes" 9 "compress/gzip" 10 "context" 11 "encoding/base64" 12 "encoding/csv" 13 "encoding/json" 14 "errors" 15 "fmt" 16 "io" 17 "io/ioutil" 18 "net/http" 19 "net/url" 20 "path" 21 "strconv" 22 "strings" 23 "sync" 24 "time" 25 26 http2 "github.com/influxdata/influxdb-client-go/v2/api/http" 27 "github.com/influxdata/influxdb-client-go/v2/api/query" 28 "github.com/influxdata/influxdb-client-go/v2/domain" 29 "github.com/influxdata/influxdb-client-go/v2/internal/log" 30 ilog "github.com/influxdata/influxdb-client-go/v2/log" 31) 32 33const ( 34 stringDatatype = "string" 35 doubleDatatype = "double" 36 boolDatatype = "boolean" 37 longDatatype = "long" 38 uLongDatatype = "unsignedLong" 39 durationDatatype = "duration" 40 base64BinaryDataType = "base64Binary" 41 timeDatatypeRFC = "dateTime:RFC3339" 42 timeDatatypeRFCNano = "dateTime:RFC3339Nano" 43) 44 45// QueryAPI provides methods for performing synchronously flux query against InfluxDB server. 46type QueryAPI interface { 47 // QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect 48 QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) 49 // Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts 50 Query(ctx context.Context, query string) (*QueryTableResult, error) 51} 52 53// NewQueryAPI returns new query client for querying buckets belonging to org 54func NewQueryAPI(org string, service http2.Service) QueryAPI { 55 return &queryAPI{ 56 org: org, 57 httpService: service, 58 } 59} 60 61// queryAPI implements QueryAPI interface 62type queryAPI struct { 63 org string 64 httpService http2.Service 65 url string 66 lock sync.Mutex 67} 68 69func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) { 70 queryURL, err := q.queryURL() 71 if err != nil { 72 return "", err 73 } 74 queryType := domain.QueryTypeFlux 75 qr := domain.Query{Query: query, Type: &queryType, Dialect: dialect} 76 qrJSON, err := json.Marshal(qr) 77 if err != nil { 78 return "", err 79 } 80 if log.Level() >= ilog.DebugLevel { 81 log.Debugf("Query: %s", qrJSON) 82 } 83 var body string 84 perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) { 85 req.Header.Set("Content-Type", "application/json") 86 req.Header.Set("Accept-Encoding", "gzip") 87 }, 88 func(resp *http.Response) error { 89 if resp.Header.Get("Content-Encoding") == "gzip" { 90 resp.Body, err = gzip.NewReader(resp.Body) 91 if err != nil { 92 return err 93 } 94 } 95 respBody, err := ioutil.ReadAll(resp.Body) 96 if err != nil { 97 return err 98 } 99 body = string(respBody) 100 return nil 101 }) 102 if perror != nil { 103 return "", perror 104 } 105 return body, nil 106} 107 108// DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter 109func DefaultDialect() *domain.Dialect { 110 annotations := []domain.DialectAnnotations{domain.DialectAnnotationsDatatype, domain.DialectAnnotationsGroup, domain.DialectAnnotationsDefault} 111 delimiter := "," 112 header := true 113 return &domain.Dialect{ 114 Annotations: &annotations, 115 Delimiter: &delimiter, 116 Header: &header, 117 } 118} 119 120func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult, error) { 121 var queryResult *QueryTableResult 122 queryURL, err := q.queryURL() 123 if err != nil { 124 return nil, err 125 } 126 queryType := domain.QueryTypeFlux 127 qr := domain.Query{Query: query, Type: &queryType, Dialect: DefaultDialect()} 128 qrJSON, err := json.Marshal(qr) 129 if err != nil { 130 return nil, err 131 } 132 if log.Level() >= ilog.DebugLevel { 133 log.Debugf("Query: %s", qrJSON) 134 } 135 perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) { 136 req.Header.Set("Content-Type", "application/json") 137 req.Header.Set("Accept-Encoding", "gzip") 138 }, 139 func(resp *http.Response) error { 140 if resp.Header.Get("Content-Encoding") == "gzip" { 141 resp.Body, err = gzip.NewReader(resp.Body) 142 if err != nil { 143 return err 144 } 145 } 146 csvReader := csv.NewReader(resp.Body) 147 csvReader.FieldsPerRecord = -1 148 queryResult = &QueryTableResult{Closer: resp.Body, csvReader: csvReader} 149 return nil 150 }) 151 if perror != nil { 152 return queryResult, perror 153 } 154 return queryResult, nil 155} 156 157func (q *queryAPI) queryURL() (string, error) { 158 if q.url == "" { 159 u, err := url.Parse(q.httpService.ServerAPIURL()) 160 if err != nil { 161 return "", err 162 } 163 u.Path = path.Join(u.Path, "query") 164 165 params := u.Query() 166 params.Set("org", q.org) 167 u.RawQuery = params.Encode() 168 q.lock.Lock() 169 q.url = u.String() 170 q.lock.Unlock() 171 } 172 return q.url, nil 173} 174 175// QueryTableResult parses streamed flux query response into structures representing flux table parts 176// Walking though the result is done by repeatedly calling Next() until returns false. 177// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method. 178// Data are acquired by Record() method. 179// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error 180type QueryTableResult struct { 181 io.Closer 182 csvReader *csv.Reader 183 tablePosition int 184 tableChanged bool 185 table *query.FluxTableMetadata 186 record *query.FluxRecord 187 err error 188} 189 190// TablePosition returns actual flux table position in the result, or -1 if no table was found yet 191// Each new table is introduced by an annotation in csv 192func (q *QueryTableResult) TablePosition() int { 193 if q.table != nil { 194 return q.table.Position() 195 } 196 return -1 197} 198 199// TableMetadata returns actual flux table metadata 200func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata { 201 return q.table 202} 203 204// TableChanged returns true if last call of Next() found also new result table 205// Table information is available via TableMetadata method 206func (q *QueryTableResult) TableChanged() bool { 207 return q.tableChanged 208} 209 210// Record returns last parsed flux table data row 211// Use Record methods to access value and row properties 212func (q *QueryTableResult) Record() *query.FluxRecord { 213 return q.record 214} 215 216type parsingState int 217 218const ( 219 parsingStateNormal parsingState = iota 220 parsingStateAnnotation 221 parsingStateNameRow 222 parsingStateError 223) 224 225// Next advances to next row in query result. 226// During the first time it is called, Next creates also table metadata 227// Actual parsed row is available through Record() function 228// Returns false in case of end or an error, otherwise true 229func (q *QueryTableResult) Next() bool { 230 var row []string 231 // set closing query in case of preliminary return 232 closer := func() { 233 if err := q.Close(); err != nil { 234 message := err.Error() 235 if q.err != nil { 236 message = fmt.Sprintf("%s,%s", message, q.err.Error()) 237 } 238 q.err = errors.New(message) 239 } 240 } 241 defer func() { 242 closer() 243 }() 244 parsingState := parsingStateNormal 245 q.tableChanged = false 246 dataTypeAnnotationFound := false 247readRow: 248 row, q.err = q.csvReader.Read() 249 if q.err == io.EOF { 250 q.err = nil 251 return false 252 } 253 if q.err != nil { 254 return false 255 } 256 257 if len(row) <= 1 { 258 goto readRow 259 } 260 if len(row[0]) > 0 && row[0][0] == '#' { 261 if parsingState == parsingStateNormal { 262 q.table = query.NewFluxTableMetadata(q.tablePosition) 263 q.tablePosition++ 264 q.tableChanged = true 265 for i := range row[1:] { 266 q.table.AddColumn(query.NewFluxColumn(i)) 267 } 268 parsingState = parsingStateAnnotation 269 } 270 } 271 if q.table == nil { 272 q.err = errors.New("parsing error, annotations not found") 273 return false 274 } 275 if len(row)-1 != len(q.table.Columns()) { 276 q.err = fmt.Errorf("parsing error, row has different number of columns than the table: %d vs %d", len(row)-1, len(q.table.Columns())) 277 return false 278 } 279 switch row[0] { 280 case "": 281 switch parsingState { 282 case parsingStateAnnotation: 283 if !dataTypeAnnotationFound { 284 q.err = errors.New("parsing error, datatype annotation not found") 285 return false 286 } 287 parsingState = parsingStateNameRow 288 fallthrough 289 case parsingStateNameRow: 290 if row[1] == "error" { 291 parsingState = parsingStateError 292 } else { 293 for i, n := range row[1:] { 294 if q.table.Column(i) != nil { 295 q.table.Column(i).SetName(n) 296 } 297 } 298 parsingState = parsingStateNormal 299 } 300 goto readRow 301 case parsingStateError: 302 var message string 303 if len(row) > 1 && len(row[1]) > 0 { 304 message = row[1] 305 } else { 306 message = "unknown query error" 307 } 308 reference := "" 309 if len(row) > 2 && len(row[2]) > 0 { 310 reference = fmt.Sprintf(",%s", row[2]) 311 } 312 q.err = fmt.Errorf("%s%s", message, reference) 313 return false 314 } 315 values := make(map[string]interface{}) 316 for i, v := range row[1:] { 317 if q.table.Column(i) != nil { 318 values[q.table.Column(i).Name()], q.err = toValue(stringTernary(v, q.table.Column(i).DefaultValue()), q.table.Column(i).DataType(), q.table.Column(i).Name()) 319 if q.err != nil { 320 return false 321 } 322 } 323 } 324 q.record = query.NewFluxRecord(q.table.Position(), values) 325 case "#datatype": 326 dataTypeAnnotationFound = true 327 for i, d := range row[1:] { 328 if q.table.Column(i) != nil { 329 q.table.Column(i).SetDataType(d) 330 } 331 } 332 goto readRow 333 case "#group": 334 for i, g := range row[1:] { 335 if q.table.Column(i) != nil { 336 q.table.Column(i).SetGroup(g == "true") 337 } 338 } 339 goto readRow 340 case "#default": 341 for i, c := range row[1:] { 342 if q.table.Column(i) != nil { 343 q.table.Column(i).SetDefaultValue(c) 344 } 345 } 346 goto readRow 347 } 348 // don't close query 349 closer = func() {} 350 return true 351} 352 353// Err returns an error raised during flux query response parsing 354func (q *QueryTableResult) Err() error { 355 return q.err 356} 357 358// Close reads remaining data and closes underlying Closer 359func (q *QueryTableResult) Close() error { 360 var err error 361 for err == nil { 362 _, err = q.csvReader.Read() 363 } 364 return q.Closer.Close() 365} 366 367// stringTernary returns a if not empty, otherwise b 368func stringTernary(a, b string) string { 369 if a == "" { 370 return b 371 } 372 return a 373} 374 375// toValues converts s into type by t 376func toValue(s, t, name string) (interface{}, error) { 377 if s == "" { 378 return nil, nil 379 } 380 switch t { 381 case stringDatatype: 382 return s, nil 383 case timeDatatypeRFC: 384 return time.Parse(time.RFC3339, s) 385 case timeDatatypeRFCNano: 386 return time.Parse(time.RFC3339Nano, s) 387 case durationDatatype: 388 return time.ParseDuration(s) 389 case doubleDatatype: 390 return strconv.ParseFloat(s, 64) 391 case boolDatatype: 392 if strings.ToLower(s) == "false" { 393 return false, nil 394 } 395 return true, nil 396 case longDatatype: 397 return strconv.ParseInt(s, 10, 64) 398 case uLongDatatype: 399 return strconv.ParseUint(s, 10, 64) 400 case base64BinaryDataType: 401 return base64.StdEncoding.DecodeString(s) 402 default: 403 return nil, fmt.Errorf("%s has unknown data type %s", name, t) 404 } 405} 406