1package client 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 "unicode/utf8" 9 10 "github.com/influxdata/flux" 11 "github.com/influxdata/flux/csv" 12 "github.com/influxdata/flux/lang" 13 "github.com/influxdata/influxdb/query" 14 "github.com/pkg/errors" 15 "github.com/prometheus/client_golang/prometheus" 16) 17 18type Controller interface { 19 Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) 20 PrometheusCollectors() []prometheus.Collector 21} 22 23// QueryRequest is a flux query request. 24type QueryRequest struct { 25 Type string `json:"type"` 26 Query string `json:"query"` 27 28 // Flux fields 29 Extern json.RawMessage `json:"extern,omitempty"` 30 AST json.RawMessage `json:"ast,omitempty"` 31 Dialect QueryDialect `json:"dialect"` 32 Now time.Time `json:"now"` 33 34 // PreferNoContent specifies if the Response to this request should 35 // contain any result. This is done for avoiding unnecessary 36 // bandwidth consumption in certain cases. For example, when the 37 // query produces side effects and the results do not matter. E.g.: 38 // from(...) |> ... |> to() 39 // For example, tasks do not use the results of queries, but only 40 // care about their side effects. 41 // To obtain a QueryRequest with no result, add the header 42 // `Prefer: return-no-content` to the HTTP request. 43 PreferNoContent bool 44 // PreferNoContentWithError is the same as above, but it forces the 45 // Response to contain an error if that is a Flux runtime error encoded 46 // in the response body. 47 // To obtain a QueryRequest with no result but runtime errors, 48 // add the header `Prefer: return-no-content-with-error` to the HTTP request. 49 PreferNoContentWithError bool 50} 51 52// QueryDialect is the formatting options for the query response. 53type QueryDialect struct { 54 Header *bool `json:"header"` 55 Delimiter string `json:"delimiter"` 56 CommentPrefix string `json:"commentPrefix"` 57 DateTimeFormat string `json:"dateTimeFormat"` 58 Annotations []string `json:"annotations"` 59} 60 61// WithDefaults adds default values to the request. 62func (r QueryRequest) WithDefaults() QueryRequest { 63 if r.Type == "" { 64 r.Type = "flux" 65 } 66 if r.Dialect.Delimiter == "" { 67 r.Dialect.Delimiter = "," 68 } 69 if r.Dialect.DateTimeFormat == "" { 70 r.Dialect.DateTimeFormat = "RFC3339" 71 } 72 if r.Dialect.Header == nil { 73 header := true 74 r.Dialect.Header = &header 75 } 76 return r 77} 78 79// Validate checks the query request and returns an error if the request is invalid. 80func (r QueryRequest) Validate() error { 81 if r.Query == "" && r.AST == nil { 82 return errors.New(`request body requires either query or AST`) 83 } 84 85 if r.Type != "flux" { 86 return fmt.Errorf(`unknown query type: %s`, r.Type) 87 } 88 89 if len(r.Dialect.CommentPrefix) > 1 { 90 return fmt.Errorf("invalid dialect comment prefix: must be length 0 or 1") 91 } 92 93 if len(r.Dialect.Delimiter) != 1 { 94 return fmt.Errorf("invalid dialect delimeter: must be length 1") 95 } 96 97 rn, size := utf8.DecodeRuneInString(r.Dialect.Delimiter) 98 if rn == utf8.RuneError && size == 1 { 99 return fmt.Errorf("invalid dialect delimeter character") 100 } 101 102 for _, a := range r.Dialect.Annotations { 103 switch a { 104 case "group", "datatype", "default": 105 default: 106 return fmt.Errorf(`unknown dialect annotation type: %s`, a) 107 } 108 } 109 110 switch r.Dialect.DateTimeFormat { 111 case "RFC3339", "RFC3339Nano": 112 default: 113 return fmt.Errorf(`unknown dialect date time format: %s`, r.Dialect.DateTimeFormat) 114 } 115 116 return nil 117} 118 119// ProxyRequest specifies a query request and the dialect for the results. 120type ProxyRequest struct { 121 // Compiler converts the query to a specification to run against the data. 122 Compiler flux.Compiler 123 124 // Dialect is the result encoder 125 Dialect flux.Dialect 126} 127 128// ProxyRequest returns a request to proxy from the flux. 129func (r QueryRequest) ProxyRequest() *ProxyRequest { 130 n := r.Now 131 if n.IsZero() { 132 n = time.Now() 133 } 134 135 // Query is preferred over spec 136 var compiler flux.Compiler 137 if r.Query != "" { 138 compiler = lang.FluxCompiler{ 139 Query: r.Query, 140 Extern: r.Extern, 141 Now: n, 142 } 143 } else if len(r.AST) > 0 { 144 c := lang.ASTCompiler{ 145 Extern: r.Extern, 146 AST: r.AST, 147 Now: n, 148 } 149 compiler = c 150 } 151 152 delimiter, _ := utf8.DecodeRuneInString(r.Dialect.Delimiter) 153 154 noHeader := false 155 if r.Dialect.Header != nil { 156 noHeader = !*r.Dialect.Header 157 } 158 159 var dialect flux.Dialect 160 if r.PreferNoContent { 161 dialect = &query.NoContentDialect{} 162 } else { 163 // TODO(nathanielc): Use commentPrefix and dateTimeFormat 164 // once they are supported. 165 encConfig := csv.ResultEncoderConfig{ 166 NoHeader: noHeader, 167 Delimiter: delimiter, 168 Annotations: r.Dialect.Annotations, 169 } 170 if r.PreferNoContentWithError { 171 dialect = &query.NoContentWithErrorDialect{ 172 ResultEncoderConfig: encConfig, 173 } 174 } else { 175 dialect = &csv.Dialect{ 176 ResultEncoderConfig: encConfig, 177 } 178 } 179 } 180 181 return &ProxyRequest{ 182 Compiler: compiler, 183 Dialect: dialect, 184 } 185} 186