1// Package client (v2) is the current official Go client for InfluxDB.
2package client // import "github.com/influxdata/influxdb/client/v2"
3
4import (
5	"bytes"
6	"crypto/tls"
7	"encoding/json"
8	"errors"
9	"fmt"
10	"io"
11	"io/ioutil"
12	"mime"
13	"net/http"
14	"net/url"
15	"path"
16	"strconv"
17	"strings"
18	"time"
19
20	"github.com/influxdata/influxdb/models"
21)
22
23// HTTPConfig is the config data needed to create an HTTP Client.
24type HTTPConfig struct {
25	// Addr should be of the form "http://host:port"
26	// or "http://[ipv6-host%zone]:port".
27	Addr string
28
29	// Username is the influxdb username, optional.
30	Username string
31
32	// Password is the influxdb password, optional.
33	Password string
34
35	// UserAgent is the http User Agent, defaults to "InfluxDBClient".
36	UserAgent string
37
38	// Timeout for influxdb writes, defaults to no timeout.
39	Timeout time.Duration
40
41	// InsecureSkipVerify gets passed to the http client, if true, it will
42	// skip https certificate verification. Defaults to false.
43	InsecureSkipVerify bool
44
45	// TLSConfig allows the user to set their own TLS config for the HTTP
46	// Client. If set, this option overrides InsecureSkipVerify.
47	TLSConfig *tls.Config
48
49	// Proxy configures the Proxy function on the HTTP client.
50	Proxy func(req *http.Request) (*url.URL, error)
51}
52
53// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
54type BatchPointsConfig struct {
55	// Precision is the write precision of the points, defaults to "ns".
56	Precision string
57
58	// Database is the database to write points to.
59	Database string
60
61	// RetentionPolicy is the retention policy of the points.
62	RetentionPolicy string
63
64	// Write consistency is the number of servers required to confirm write.
65	WriteConsistency string
66}
67
68// Client is a client interface for writing & querying the database.
69type Client interface {
70	// Ping checks that status of cluster, and will always return 0 time and no
71	// error for UDP clients.
72	Ping(timeout time.Duration) (time.Duration, string, error)
73
74	// Write takes a BatchPoints object and writes all Points to InfluxDB.
75	Write(bp BatchPoints) error
76
77	// Query makes an InfluxDB Query on the database. This will fail if using
78	// the UDP client.
79	Query(q Query) (*Response, error)
80
81	// Close releases any resources a Client may be using.
82	Close() error
83}
84
85// NewHTTPClient returns a new Client from the provided config.
86// Client is safe for concurrent use by multiple goroutines.
87func NewHTTPClient(conf HTTPConfig) (Client, error) {
88	if conf.UserAgent == "" {
89		conf.UserAgent = "InfluxDBClient"
90	}
91
92	u, err := url.Parse(conf.Addr)
93	if err != nil {
94		return nil, err
95	} else if u.Scheme != "http" && u.Scheme != "https" {
96		m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
97			" must start with http:// or https://", u.Scheme)
98		return nil, errors.New(m)
99	}
100
101	tr := &http.Transport{
102		TLSClientConfig: &tls.Config{
103			InsecureSkipVerify: conf.InsecureSkipVerify,
104		},
105		Proxy: conf.Proxy,
106	}
107	if conf.TLSConfig != nil {
108		tr.TLSClientConfig = conf.TLSConfig
109	}
110	return &client{
111		url:       *u,
112		username:  conf.Username,
113		password:  conf.Password,
114		useragent: conf.UserAgent,
115		httpClient: &http.Client{
116			Timeout:   conf.Timeout,
117			Transport: tr,
118		},
119		transport: tr,
120	}, nil
121}
122
123// Ping will check to see if the server is up with an optional timeout on waiting for leader.
124// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
125func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
126	now := time.Now()
127
128	u := c.url
129	u.Path = path.Join(u.Path, "ping")
130
131	req, err := http.NewRequest("GET", u.String(), nil)
132	if err != nil {
133		return 0, "", err
134	}
135
136	req.Header.Set("User-Agent", c.useragent)
137
138	if c.username != "" {
139		req.SetBasicAuth(c.username, c.password)
140	}
141
142	if timeout > 0 {
143		params := req.URL.Query()
144		params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
145		req.URL.RawQuery = params.Encode()
146	}
147
148	resp, err := c.httpClient.Do(req)
149	if err != nil {
150		return 0, "", err
151	}
152	defer resp.Body.Close()
153
154	body, err := ioutil.ReadAll(resp.Body)
155	if err != nil {
156		return 0, "", err
157	}
158
159	if resp.StatusCode != http.StatusNoContent {
160		var err = fmt.Errorf(string(body))
161		return 0, "", err
162	}
163
164	version := resp.Header.Get("X-Influxdb-Version")
165	return time.Since(now), version, nil
166}
167
168// Close releases the client's resources.
169func (c *client) Close() error {
170	c.transport.CloseIdleConnections()
171	return nil
172}
173
174// client is safe for concurrent use as the fields are all read-only
175// once the client is instantiated.
176type client struct {
177	// N.B - if url.UserInfo is accessed in future modifications to the
178	// methods on client, you will need to synchronize access to url.
179	url        url.URL
180	username   string
181	password   string
182	useragent  string
183	httpClient *http.Client
184	transport  *http.Transport
185}
186
187// BatchPoints is an interface into a batched grouping of points to write into
188// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
189// batch for each goroutine.
190type BatchPoints interface {
191	// AddPoint adds the given point to the Batch of points.
192	AddPoint(p *Point)
193	// AddPoints adds the given points to the Batch of points.
194	AddPoints(ps []*Point)
195	// Points lists the points in the Batch.
196	Points() []*Point
197
198	// Precision returns the currently set precision of this Batch.
199	Precision() string
200	// SetPrecision sets the precision of this batch.
201	SetPrecision(s string) error
202
203	// Database returns the currently set database of this Batch.
204	Database() string
205	// SetDatabase sets the database of this Batch.
206	SetDatabase(s string)
207
208	// WriteConsistency returns the currently set write consistency of this Batch.
209	WriteConsistency() string
210	// SetWriteConsistency sets the write consistency of this Batch.
211	SetWriteConsistency(s string)
212
213	// RetentionPolicy returns the currently set retention policy of this Batch.
214	RetentionPolicy() string
215	// SetRetentionPolicy sets the retention policy of this Batch.
216	SetRetentionPolicy(s string)
217}
218
219// NewBatchPoints returns a BatchPoints interface based on the given config.
220func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
221	if conf.Precision == "" {
222		conf.Precision = "ns"
223	}
224	if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
225		return nil, err
226	}
227	bp := &batchpoints{
228		database:         conf.Database,
229		precision:        conf.Precision,
230		retentionPolicy:  conf.RetentionPolicy,
231		writeConsistency: conf.WriteConsistency,
232	}
233	return bp, nil
234}
235
236type batchpoints struct {
237	points           []*Point
238	database         string
239	precision        string
240	retentionPolicy  string
241	writeConsistency string
242}
243
244func (bp *batchpoints) AddPoint(p *Point) {
245	bp.points = append(bp.points, p)
246}
247
248func (bp *batchpoints) AddPoints(ps []*Point) {
249	bp.points = append(bp.points, ps...)
250}
251
252func (bp *batchpoints) Points() []*Point {
253	return bp.points
254}
255
256func (bp *batchpoints) Precision() string {
257	return bp.precision
258}
259
260func (bp *batchpoints) Database() string {
261	return bp.database
262}
263
264func (bp *batchpoints) WriteConsistency() string {
265	return bp.writeConsistency
266}
267
268func (bp *batchpoints) RetentionPolicy() string {
269	return bp.retentionPolicy
270}
271
272func (bp *batchpoints) SetPrecision(p string) error {
273	if _, err := time.ParseDuration("1" + p); err != nil {
274		return err
275	}
276	bp.precision = p
277	return nil
278}
279
280func (bp *batchpoints) SetDatabase(db string) {
281	bp.database = db
282}
283
284func (bp *batchpoints) SetWriteConsistency(wc string) {
285	bp.writeConsistency = wc
286}
287
288func (bp *batchpoints) SetRetentionPolicy(rp string) {
289	bp.retentionPolicy = rp
290}
291
292// Point represents a single data point.
293type Point struct {
294	pt models.Point
295}
296
297// NewPoint returns a point with the given timestamp. If a timestamp is not
298// given, then data is sent to the database without a timestamp, in which case
299// the server will assign local time upon reception. NOTE: it is recommended to
300// send data with a timestamp.
301func NewPoint(
302	name string,
303	tags map[string]string,
304	fields map[string]interface{},
305	t ...time.Time,
306) (*Point, error) {
307	var T time.Time
308	if len(t) > 0 {
309		T = t[0]
310	}
311
312	pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
313	if err != nil {
314		return nil, err
315	}
316	return &Point{
317		pt: pt,
318	}, nil
319}
320
321// String returns a line-protocol string of the Point.
322func (p *Point) String() string {
323	return p.pt.String()
324}
325
326// PrecisionString returns a line-protocol string of the Point,
327// with the timestamp formatted for the given precision.
328func (p *Point) PrecisionString(precision string) string {
329	return p.pt.PrecisionString(precision)
330}
331
332// Name returns the measurement name of the point.
333func (p *Point) Name() string {
334	return string(p.pt.Name())
335}
336
337// Tags returns the tags associated with the point.
338func (p *Point) Tags() map[string]string {
339	return p.pt.Tags().Map()
340}
341
342// Time return the timestamp for the point.
343func (p *Point) Time() time.Time {
344	return p.pt.Time()
345}
346
347// UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
348func (p *Point) UnixNano() int64 {
349	return p.pt.UnixNano()
350}
351
352// Fields returns the fields for the point.
353func (p *Point) Fields() (map[string]interface{}, error) {
354	return p.pt.Fields()
355}
356
357// NewPointFrom returns a point from the provided models.Point.
358func NewPointFrom(pt models.Point) *Point {
359	return &Point{pt: pt}
360}
361
362func (c *client) Write(bp BatchPoints) error {
363	var b bytes.Buffer
364
365	for _, p := range bp.Points() {
366		if p == nil {
367			continue
368		}
369		if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
370			return err
371		}
372
373		if err := b.WriteByte('\n'); err != nil {
374			return err
375		}
376	}
377
378	u := c.url
379	u.Path = path.Join(u.Path, "write")
380
381	req, err := http.NewRequest("POST", u.String(), &b)
382	if err != nil {
383		return err
384	}
385	req.Header.Set("Content-Type", "")
386	req.Header.Set("User-Agent", c.useragent)
387	if c.username != "" {
388		req.SetBasicAuth(c.username, c.password)
389	}
390
391	params := req.URL.Query()
392	params.Set("db", bp.Database())
393	params.Set("rp", bp.RetentionPolicy())
394	params.Set("precision", bp.Precision())
395	params.Set("consistency", bp.WriteConsistency())
396	req.URL.RawQuery = params.Encode()
397
398	resp, err := c.httpClient.Do(req)
399	if err != nil {
400		return err
401	}
402	defer resp.Body.Close()
403
404	body, err := ioutil.ReadAll(resp.Body)
405	if err != nil {
406		return err
407	}
408
409	if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
410		var err = fmt.Errorf(string(body))
411		return err
412	}
413
414	return nil
415}
416
417// Query defines a query to send to the server.
418type Query struct {
419	Command         string
420	Database        string
421	RetentionPolicy string
422	Precision       string
423	Chunked         bool
424	ChunkSize       int
425	Parameters      map[string]interface{}
426}
427
428// NewQuery returns a query object.
429// The database and precision arguments can be empty strings if they are not needed for the query.
430func NewQuery(command, database, precision string) Query {
431	return Query{
432		Command:    command,
433		Database:   database,
434		Precision:  precision,
435		Parameters: make(map[string]interface{}),
436	}
437}
438
439// NewQueryWithRP returns a query object.
440// The database, retention policy, and precision arguments can be empty strings if they are not needed
441// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater.
442func NewQueryWithRP(command, database, retentionPolicy, precision string) Query {
443	return Query{
444		Command:         command,
445		Database:        database,
446		RetentionPolicy: retentionPolicy,
447		Precision:       precision,
448		Parameters:      make(map[string]interface{}),
449	}
450}
451
452// NewQueryWithParameters returns a query object.
453// The database and precision arguments can be empty strings if they are not needed for the query.
454// parameters is a map of the parameter names used in the command to their values.
455func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
456	return Query{
457		Command:    command,
458		Database:   database,
459		Precision:  precision,
460		Parameters: parameters,
461	}
462}
463
464// Response represents a list of statement results.
465type Response struct {
466	Results []Result
467	Err     string `json:"error,omitempty"`
468}
469
470// Error returns the first error from any statement.
471// It returns nil if no errors occurred on any statements.
472func (r *Response) Error() error {
473	if r.Err != "" {
474		return fmt.Errorf(r.Err)
475	}
476	for _, result := range r.Results {
477		if result.Err != "" {
478			return fmt.Errorf(result.Err)
479		}
480	}
481	return nil
482}
483
484// Message represents a user message.
485type Message struct {
486	Level string
487	Text  string
488}
489
490// Result represents a resultset returned from a single statement.
491type Result struct {
492	Series   []models.Row
493	Messages []*Message
494	Err      string `json:"error,omitempty"`
495}
496
497// Query sends a command to the server and returns the Response.
498func (c *client) Query(q Query) (*Response, error) {
499	u := c.url
500	u.Path = path.Join(u.Path, "query")
501
502	jsonParameters, err := json.Marshal(q.Parameters)
503
504	if err != nil {
505		return nil, err
506	}
507
508	req, err := http.NewRequest("POST", u.String(), nil)
509	if err != nil {
510		return nil, err
511	}
512
513	req.Header.Set("Content-Type", "")
514	req.Header.Set("User-Agent", c.useragent)
515
516	if c.username != "" {
517		req.SetBasicAuth(c.username, c.password)
518	}
519
520	params := req.URL.Query()
521	params.Set("q", q.Command)
522	params.Set("db", q.Database)
523	if q.RetentionPolicy != "" {
524		params.Set("rp", q.RetentionPolicy)
525	}
526	params.Set("params", string(jsonParameters))
527	if q.Chunked {
528		params.Set("chunked", "true")
529		if q.ChunkSize > 0 {
530			params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
531		}
532	}
533
534	if q.Precision != "" {
535		params.Set("epoch", q.Precision)
536	}
537	req.URL.RawQuery = params.Encode()
538
539	resp, err := c.httpClient.Do(req)
540	if err != nil {
541		return nil, err
542	}
543	defer resp.Body.Close()
544
545	// If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
546	// but instead some other service. If the error code is also a 500+ code, then some
547	// downstream loadbalancer/proxy/etc had an issue and we should report that.
548	if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
549		body, err := ioutil.ReadAll(resp.Body)
550		if err != nil || len(body) == 0 {
551			return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
552		}
553
554		return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
555	}
556
557	// If we get an unexpected content type, then it is also not from influx direct and therefore
558	// we want to know what we received and what status code was returned for debugging purposes.
559	if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
560		// Read up to 1kb of the body to help identify downstream errors and limit the impact of things
561		// like downstream serving a large file
562		body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
563		if err != nil || len(body) == 0 {
564			return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
565		}
566
567		return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
568	}
569
570	var response Response
571	if q.Chunked {
572		cr := NewChunkedResponse(resp.Body)
573		for {
574			r, err := cr.NextResponse()
575			if err != nil {
576				// If we got an error while decoding the response, send that back.
577				return nil, err
578			}
579
580			if r == nil {
581				break
582			}
583
584			response.Results = append(response.Results, r.Results...)
585			if r.Err != "" {
586				response.Err = r.Err
587				break
588			}
589		}
590	} else {
591		dec := json.NewDecoder(resp.Body)
592		dec.UseNumber()
593		decErr := dec.Decode(&response)
594
595		// ignore this error if we got an invalid status code
596		if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
597			decErr = nil
598		}
599		// If we got a valid decode error, send that back
600		if decErr != nil {
601			return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
602		}
603	}
604
605	// If we don't have an error in our json response, and didn't get statusOK
606	// then send back an error
607	if resp.StatusCode != http.StatusOK && response.Error() == nil {
608		return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
609	}
610	return &response, nil
611}
612
613// duplexReader reads responses and writes it to another writer while
614// satisfying the reader interface.
615type duplexReader struct {
616	r io.Reader
617	w io.Writer
618}
619
620func (r *duplexReader) Read(p []byte) (n int, err error) {
621	n, err = r.r.Read(p)
622	if err == nil {
623		r.w.Write(p[:n])
624	}
625	return n, err
626}
627
628// ChunkedResponse represents a response from the server that
629// uses chunking to stream the output.
630type ChunkedResponse struct {
631	dec    *json.Decoder
632	duplex *duplexReader
633	buf    bytes.Buffer
634}
635
636// NewChunkedResponse reads a stream and produces responses from the stream.
637func NewChunkedResponse(r io.Reader) *ChunkedResponse {
638	resp := &ChunkedResponse{}
639	resp.duplex = &duplexReader{r: r, w: &resp.buf}
640	resp.dec = json.NewDecoder(resp.duplex)
641	resp.dec.UseNumber()
642	return resp
643}
644
645// NextResponse reads the next line of the stream and returns a response.
646func (r *ChunkedResponse) NextResponse() (*Response, error) {
647	var response Response
648
649	if err := r.dec.Decode(&response); err != nil {
650		if err == io.EOF {
651			return nil, nil
652		}
653		// A decoding error happened. This probably means the server crashed
654		// and sent a last-ditch error message to us. Ensure we have read the
655		// entirety of the connection to get any remaining error text.
656		io.Copy(ioutil.Discard, r.duplex)
657		return nil, errors.New(strings.TrimSpace(r.buf.String()))
658	}
659
660	r.buf.Reset()
661	return &response, nil
662}
663