1package client
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7	"time"
8	"unicode/utf8"
9
10	"github.com/influxdata/flux"
11	"github.com/influxdata/flux/csv"
12	"github.com/influxdata/flux/lang"
13	"github.com/influxdata/influxdb/query"
14	"github.com/pkg/errors"
15	"github.com/prometheus/client_golang/prometheus"
16)
17
18type Controller interface {
19	Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error)
20	PrometheusCollectors() []prometheus.Collector
21}
22
23// QueryRequest is a flux query request.
24type QueryRequest struct {
25	Type  string `json:"type"`
26	Query string `json:"query"`
27
28	// Flux fields
29	Extern  json.RawMessage `json:"extern,omitempty"`
30	AST     json.RawMessage `json:"ast,omitempty"`
31	Dialect QueryDialect    `json:"dialect"`
32	Now     time.Time       `json:"now"`
33
34	// PreferNoContent specifies if the Response to this request should
35	// contain any result. This is done for avoiding unnecessary
36	// bandwidth consumption in certain cases. For example, when the
37	// query produces side effects and the results do not matter. E.g.:
38	// 	from(...) |> ... |> to()
39	// For example, tasks do not use the results of queries, but only
40	// care about their side effects.
41	// To obtain a QueryRequest with no result, add the header
42	// `Prefer: return-no-content` to the HTTP request.
43	PreferNoContent bool
44	// PreferNoContentWithError is the same as above, but it forces the
45	// Response to contain an error if that is a Flux runtime error encoded
46	// in the response body.
47	// To obtain a QueryRequest with no result but runtime errors,
48	// add the header `Prefer: return-no-content-with-error` to the HTTP request.
49	PreferNoContentWithError bool
50}
51
52// QueryDialect is the formatting options for the query response.
53type QueryDialect struct {
54	Header         *bool    `json:"header"`
55	Delimiter      string   `json:"delimiter"`
56	CommentPrefix  string   `json:"commentPrefix"`
57	DateTimeFormat string   `json:"dateTimeFormat"`
58	Annotations    []string `json:"annotations"`
59}
60
61// WithDefaults adds default values to the request.
62func (r QueryRequest) WithDefaults() QueryRequest {
63	if r.Type == "" {
64		r.Type = "flux"
65	}
66	if r.Dialect.Delimiter == "" {
67		r.Dialect.Delimiter = ","
68	}
69	if r.Dialect.DateTimeFormat == "" {
70		r.Dialect.DateTimeFormat = "RFC3339"
71	}
72	if r.Dialect.Header == nil {
73		header := true
74		r.Dialect.Header = &header
75	}
76	return r
77}
78
79// Validate checks the query request and returns an error if the request is invalid.
80func (r QueryRequest) Validate() error {
81	if r.Query == "" && r.AST == nil {
82		return errors.New(`request body requires either query or AST`)
83	}
84
85	if r.Type != "flux" {
86		return fmt.Errorf(`unknown query type: %s`, r.Type)
87	}
88
89	if len(r.Dialect.CommentPrefix) > 1 {
90		return fmt.Errorf("invalid dialect comment prefix: must be length 0 or 1")
91	}
92
93	if len(r.Dialect.Delimiter) != 1 {
94		return fmt.Errorf("invalid dialect delimeter: must be length 1")
95	}
96
97	rn, size := utf8.DecodeRuneInString(r.Dialect.Delimiter)
98	if rn == utf8.RuneError && size == 1 {
99		return fmt.Errorf("invalid dialect delimeter character")
100	}
101
102	for _, a := range r.Dialect.Annotations {
103		switch a {
104		case "group", "datatype", "default":
105		default:
106			return fmt.Errorf(`unknown dialect annotation type: %s`, a)
107		}
108	}
109
110	switch r.Dialect.DateTimeFormat {
111	case "RFC3339", "RFC3339Nano":
112	default:
113		return fmt.Errorf(`unknown dialect date time format: %s`, r.Dialect.DateTimeFormat)
114	}
115
116	return nil
117}
118
119// ProxyRequest specifies a query request and the dialect for the results.
120type ProxyRequest struct {
121	// Compiler converts the query to a specification to run against the data.
122	Compiler flux.Compiler
123
124	// Dialect is the result encoder
125	Dialect flux.Dialect
126}
127
128// ProxyRequest returns a request to proxy from the flux.
129func (r QueryRequest) ProxyRequest() *ProxyRequest {
130	n := r.Now
131	if n.IsZero() {
132		n = time.Now()
133	}
134
135	// Query is preferred over spec
136	var compiler flux.Compiler
137	if r.Query != "" {
138		compiler = lang.FluxCompiler{
139			Query:  r.Query,
140			Extern: r.Extern,
141			Now:    n,
142		}
143	} else if len(r.AST) > 0 {
144		c := lang.ASTCompiler{
145			Extern: r.Extern,
146			AST:    r.AST,
147			Now:    n,
148		}
149		compiler = c
150	}
151
152	delimiter, _ := utf8.DecodeRuneInString(r.Dialect.Delimiter)
153
154	noHeader := false
155	if r.Dialect.Header != nil {
156		noHeader = !*r.Dialect.Header
157	}
158
159	var dialect flux.Dialect
160	if r.PreferNoContent {
161		dialect = &query.NoContentDialect{}
162	} else {
163		// TODO(nathanielc): Use commentPrefix and dateTimeFormat
164		// once they are supported.
165		encConfig := csv.ResultEncoderConfig{
166			NoHeader:    noHeader,
167			Delimiter:   delimiter,
168			Annotations: r.Dialect.Annotations,
169		}
170		if r.PreferNoContentWithError {
171			dialect = &query.NoContentWithErrorDialect{
172				ResultEncoderConfig: encConfig,
173			}
174		} else {
175			dialect = &csv.Dialect{
176				ResultEncoderConfig: encConfig,
177			}
178		}
179	}
180
181	return &ProxyRequest{
182		Compiler: compiler,
183		Dialect:  dialect,
184	}
185}
186