1// Copyright 2020-2021 InfluxData, Inc. All rights reserved.
2// Use of this source code is governed by MIT
3// license that can be found in the LICENSE file.
4
5package api
6
7import (
8	"bytes"
9	"compress/gzip"
10	"context"
11	"encoding/base64"
12	"encoding/csv"
13	"encoding/json"
14	"errors"
15	"fmt"
16	"io"
17	"io/ioutil"
18	"net/http"
19	"net/url"
20	"path"
21	"strconv"
22	"strings"
23	"sync"
24	"time"
25
26	http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
27	"github.com/influxdata/influxdb-client-go/v2/api/query"
28	"github.com/influxdata/influxdb-client-go/v2/domain"
29	"github.com/influxdata/influxdb-client-go/v2/internal/log"
30	ilog "github.com/influxdata/influxdb-client-go/v2/log"
31)
32
33const (
34	stringDatatype       = "string"
35	doubleDatatype       = "double"
36	boolDatatype         = "boolean"
37	longDatatype         = "long"
38	uLongDatatype        = "unsignedLong"
39	durationDatatype     = "duration"
40	base64BinaryDataType = "base64Binary"
41	timeDatatypeRFC      = "dateTime:RFC3339"
42	timeDatatypeRFCNano  = "dateTime:RFC3339Nano"
43)
44
45// QueryAPI provides methods for performing synchronously flux query against InfluxDB server.
46type QueryAPI interface {
47	// QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
48	QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
49	// Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
50	Query(ctx context.Context, query string) (*QueryTableResult, error)
51}
52
53// NewQueryAPI returns new query client for querying buckets belonging to org
54func NewQueryAPI(org string, service http2.Service) QueryAPI {
55	return &queryAPI{
56		org:         org,
57		httpService: service,
58	}
59}
60
61// queryAPI implements QueryAPI interface
62type queryAPI struct {
63	org         string
64	httpService http2.Service
65	url         string
66	lock        sync.Mutex
67}
68
69func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) {
70	queryURL, err := q.queryURL()
71	if err != nil {
72		return "", err
73	}
74	queryType := domain.QueryTypeFlux
75	qr := domain.Query{Query: query, Type: &queryType, Dialect: dialect}
76	qrJSON, err := json.Marshal(qr)
77	if err != nil {
78		return "", err
79	}
80	if log.Level() >= ilog.DebugLevel {
81		log.Debugf("Query: %s", qrJSON)
82	}
83	var body string
84	perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
85		req.Header.Set("Content-Type", "application/json")
86		req.Header.Set("Accept-Encoding", "gzip")
87	},
88		func(resp *http.Response) error {
89			if resp.Header.Get("Content-Encoding") == "gzip" {
90				resp.Body, err = gzip.NewReader(resp.Body)
91				if err != nil {
92					return err
93				}
94			}
95			respBody, err := ioutil.ReadAll(resp.Body)
96			if err != nil {
97				return err
98			}
99			body = string(respBody)
100			return nil
101		})
102	if perror != nil {
103		return "", perror
104	}
105	return body, nil
106}
107
108// DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter
109func DefaultDialect() *domain.Dialect {
110	annotations := []domain.DialectAnnotations{domain.DialectAnnotationsDatatype, domain.DialectAnnotationsGroup, domain.DialectAnnotationsDefault}
111	delimiter := ","
112	header := true
113	return &domain.Dialect{
114		Annotations: &annotations,
115		Delimiter:   &delimiter,
116		Header:      &header,
117	}
118}
119
120func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult, error) {
121	var queryResult *QueryTableResult
122	queryURL, err := q.queryURL()
123	if err != nil {
124		return nil, err
125	}
126	queryType := domain.QueryTypeFlux
127	qr := domain.Query{Query: query, Type: &queryType, Dialect: DefaultDialect()}
128	qrJSON, err := json.Marshal(qr)
129	if err != nil {
130		return nil, err
131	}
132	if log.Level() >= ilog.DebugLevel {
133		log.Debugf("Query: %s", qrJSON)
134	}
135	perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
136		req.Header.Set("Content-Type", "application/json")
137		req.Header.Set("Accept-Encoding", "gzip")
138	},
139		func(resp *http.Response) error {
140			if resp.Header.Get("Content-Encoding") == "gzip" {
141				resp.Body, err = gzip.NewReader(resp.Body)
142				if err != nil {
143					return err
144				}
145			}
146			csvReader := csv.NewReader(resp.Body)
147			csvReader.FieldsPerRecord = -1
148			queryResult = &QueryTableResult{Closer: resp.Body, csvReader: csvReader}
149			return nil
150		})
151	if perror != nil {
152		return queryResult, perror
153	}
154	return queryResult, nil
155}
156
157func (q *queryAPI) queryURL() (string, error) {
158	if q.url == "" {
159		u, err := url.Parse(q.httpService.ServerAPIURL())
160		if err != nil {
161			return "", err
162		}
163		u.Path = path.Join(u.Path, "query")
164
165		params := u.Query()
166		params.Set("org", q.org)
167		u.RawQuery = params.Encode()
168		q.lock.Lock()
169		q.url = u.String()
170		q.lock.Unlock()
171	}
172	return q.url, nil
173}
174
175// QueryTableResult parses streamed flux query response into structures representing flux table parts
176// Walking though the result is done by repeatedly calling Next() until returns false.
177// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
178// Data are acquired by Record() method.
179// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
180type QueryTableResult struct {
181	io.Closer
182	csvReader     *csv.Reader
183	tablePosition int
184	tableChanged  bool
185	table         *query.FluxTableMetadata
186	record        *query.FluxRecord
187	err           error
188}
189
190// TablePosition returns actual flux table position in the result, or -1 if no table was found yet
191// Each new table is introduced by an annotation in csv
192func (q *QueryTableResult) TablePosition() int {
193	if q.table != nil {
194		return q.table.Position()
195	}
196	return -1
197}
198
199// TableMetadata returns actual flux table metadata
200func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata {
201	return q.table
202}
203
204// TableChanged returns true if last call of Next() found also new result table
205// Table information is available via TableMetadata method
206func (q *QueryTableResult) TableChanged() bool {
207	return q.tableChanged
208}
209
210// Record returns last parsed flux table data row
211// Use Record methods to access value and row properties
212func (q *QueryTableResult) Record() *query.FluxRecord {
213	return q.record
214}
215
216type parsingState int
217
218const (
219	parsingStateNormal parsingState = iota
220	parsingStateAnnotation
221	parsingStateNameRow
222	parsingStateError
223)
224
225// Next advances to next row in query result.
226// During the first time it is called, Next creates also table metadata
227// Actual parsed row is available through Record() function
228// Returns false in case of end or an error, otherwise true
229func (q *QueryTableResult) Next() bool {
230	var row []string
231	// set closing query in case of preliminary return
232	closer := func() {
233		if err := q.Close(); err != nil {
234			message := err.Error()
235			if q.err != nil {
236				message = fmt.Sprintf("%s,%s", message, q.err.Error())
237			}
238			q.err = errors.New(message)
239		}
240	}
241	defer func() {
242		closer()
243	}()
244	parsingState := parsingStateNormal
245	q.tableChanged = false
246	dataTypeAnnotationFound := false
247readRow:
248	row, q.err = q.csvReader.Read()
249	if q.err == io.EOF {
250		q.err = nil
251		return false
252	}
253	if q.err != nil {
254		return false
255	}
256
257	if len(row) <= 1 {
258		goto readRow
259	}
260	if len(row[0]) > 0 && row[0][0] == '#' {
261		if parsingState == parsingStateNormal {
262			q.table = query.NewFluxTableMetadata(q.tablePosition)
263			q.tablePosition++
264			q.tableChanged = true
265			for i := range row[1:] {
266				q.table.AddColumn(query.NewFluxColumn(i))
267			}
268			parsingState = parsingStateAnnotation
269		}
270	}
271	if q.table == nil {
272		q.err = errors.New("parsing error, annotations not found")
273		return false
274	}
275	if len(row)-1 != len(q.table.Columns()) {
276		q.err = fmt.Errorf("parsing error, row has different number of columns than the table: %d vs %d", len(row)-1, len(q.table.Columns()))
277		return false
278	}
279	switch row[0] {
280	case "":
281		switch parsingState {
282		case parsingStateAnnotation:
283			if !dataTypeAnnotationFound {
284				q.err = errors.New("parsing error, datatype annotation not found")
285				return false
286			}
287			parsingState = parsingStateNameRow
288			fallthrough
289		case parsingStateNameRow:
290			if row[1] == "error" {
291				parsingState = parsingStateError
292			} else {
293				for i, n := range row[1:] {
294					if q.table.Column(i) != nil {
295						q.table.Column(i).SetName(n)
296					}
297				}
298				parsingState = parsingStateNormal
299			}
300			goto readRow
301		case parsingStateError:
302			var message string
303			if len(row) > 1 && len(row[1]) > 0 {
304				message = row[1]
305			} else {
306				message = "unknown query error"
307			}
308			reference := ""
309			if len(row) > 2 && len(row[2]) > 0 {
310				reference = fmt.Sprintf(",%s", row[2])
311			}
312			q.err = fmt.Errorf("%s%s", message, reference)
313			return false
314		}
315		values := make(map[string]interface{})
316		for i, v := range row[1:] {
317			if q.table.Column(i) != nil {
318				values[q.table.Column(i).Name()], q.err = toValue(stringTernary(v, q.table.Column(i).DefaultValue()), q.table.Column(i).DataType(), q.table.Column(i).Name())
319				if q.err != nil {
320					return false
321				}
322			}
323		}
324		q.record = query.NewFluxRecord(q.table.Position(), values)
325	case "#datatype":
326		dataTypeAnnotationFound = true
327		for i, d := range row[1:] {
328			if q.table.Column(i) != nil {
329				q.table.Column(i).SetDataType(d)
330			}
331		}
332		goto readRow
333	case "#group":
334		for i, g := range row[1:] {
335			if q.table.Column(i) != nil {
336				q.table.Column(i).SetGroup(g == "true")
337			}
338		}
339		goto readRow
340	case "#default":
341		for i, c := range row[1:] {
342			if q.table.Column(i) != nil {
343				q.table.Column(i).SetDefaultValue(c)
344			}
345		}
346		goto readRow
347	}
348	// don't close query
349	closer = func() {}
350	return true
351}
352
353// Err returns an error raised during flux query response parsing
354func (q *QueryTableResult) Err() error {
355	return q.err
356}
357
358// Close reads remaining data and closes underlying Closer
359func (q *QueryTableResult) Close() error {
360	var err error
361	for err == nil {
362		_, err = q.csvReader.Read()
363	}
364	return q.Closer.Close()
365}
366
367// stringTernary returns a if not empty, otherwise b
368func stringTernary(a, b string) string {
369	if a == "" {
370		return b
371	}
372	return a
373}
374
375// toValues converts s into type by t
376func toValue(s, t, name string) (interface{}, error) {
377	if s == "" {
378		return nil, nil
379	}
380	switch t {
381	case stringDatatype:
382		return s, nil
383	case timeDatatypeRFC:
384		return time.Parse(time.RFC3339, s)
385	case timeDatatypeRFCNano:
386		return time.Parse(time.RFC3339Nano, s)
387	case durationDatatype:
388		return time.ParseDuration(s)
389	case doubleDatatype:
390		return strconv.ParseFloat(s, 64)
391	case boolDatatype:
392		if strings.ToLower(s) == "false" {
393			return false, nil
394		}
395		return true, nil
396	case longDatatype:
397		return strconv.ParseInt(s, 10, 64)
398	case uLongDatatype:
399		return strconv.ParseUint(s, 10, 64)
400	case base64BinaryDataType:
401		return base64.StdEncoding.DecodeString(s)
402	default:
403		return nil, fmt.Errorf("%s has unknown data type %s", name, t)
404	}
405}
406