1// Package client (v2) is the current official Go client for InfluxDB. 2package client // import "github.com/influxdata/influxdb/client/v2" 3 4import ( 5 "bytes" 6 "crypto/tls" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io" 11 "io/ioutil" 12 "mime" 13 "net/http" 14 "net/url" 15 "path" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/influxdata/influxdb/models" 21) 22 23// HTTPConfig is the config data needed to create an HTTP Client. 24type HTTPConfig struct { 25 // Addr should be of the form "http://host:port" 26 // or "http://[ipv6-host%zone]:port". 27 Addr string 28 29 // Username is the influxdb username, optional. 30 Username string 31 32 // Password is the influxdb password, optional. 33 Password string 34 35 // UserAgent is the http User Agent, defaults to "InfluxDBClient". 36 UserAgent string 37 38 // Timeout for influxdb writes, defaults to no timeout. 39 Timeout time.Duration 40 41 // InsecureSkipVerify gets passed to the http client, if true, it will 42 // skip https certificate verification. Defaults to false. 43 InsecureSkipVerify bool 44 45 // TLSConfig allows the user to set their own TLS config for the HTTP 46 // Client. If set, this option overrides InsecureSkipVerify. 47 TLSConfig *tls.Config 48 49 // Proxy configures the Proxy function on the HTTP client. 50 Proxy func(req *http.Request) (*url.URL, error) 51} 52 53// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct. 54type BatchPointsConfig struct { 55 // Precision is the write precision of the points, defaults to "ns". 56 Precision string 57 58 // Database is the database to write points to. 59 Database string 60 61 // RetentionPolicy is the retention policy of the points. 62 RetentionPolicy string 63 64 // Write consistency is the number of servers required to confirm write. 65 WriteConsistency string 66} 67 68// Client is a client interface for writing & querying the database. 69type Client interface { 70 // Ping checks that status of cluster, and will always return 0 time and no 71 // error for UDP clients. 72 Ping(timeout time.Duration) (time.Duration, string, error) 73 74 // Write takes a BatchPoints object and writes all Points to InfluxDB. 75 Write(bp BatchPoints) error 76 77 // Query makes an InfluxDB Query on the database. This will fail if using 78 // the UDP client. 79 Query(q Query) (*Response, error) 80 81 // Close releases any resources a Client may be using. 82 Close() error 83} 84 85// NewHTTPClient returns a new Client from the provided config. 86// Client is safe for concurrent use by multiple goroutines. 87func NewHTTPClient(conf HTTPConfig) (Client, error) { 88 if conf.UserAgent == "" { 89 conf.UserAgent = "InfluxDBClient" 90 } 91 92 u, err := url.Parse(conf.Addr) 93 if err != nil { 94 return nil, err 95 } else if u.Scheme != "http" && u.Scheme != "https" { 96 m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+ 97 " must start with http:// or https://", u.Scheme) 98 return nil, errors.New(m) 99 } 100 101 tr := &http.Transport{ 102 TLSClientConfig: &tls.Config{ 103 InsecureSkipVerify: conf.InsecureSkipVerify, 104 }, 105 Proxy: conf.Proxy, 106 } 107 if conf.TLSConfig != nil { 108 tr.TLSClientConfig = conf.TLSConfig 109 } 110 return &client{ 111 url: *u, 112 username: conf.Username, 113 password: conf.Password, 114 useragent: conf.UserAgent, 115 httpClient: &http.Client{ 116 Timeout: conf.Timeout, 117 Transport: tr, 118 }, 119 transport: tr, 120 }, nil 121} 122 123// Ping will check to see if the server is up with an optional timeout on waiting for leader. 124// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred. 125func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) { 126 now := time.Now() 127 128 u := c.url 129 u.Path = path.Join(u.Path, "ping") 130 131 req, err := http.NewRequest("GET", u.String(), nil) 132 if err != nil { 133 return 0, "", err 134 } 135 136 req.Header.Set("User-Agent", c.useragent) 137 138 if c.username != "" { 139 req.SetBasicAuth(c.username, c.password) 140 } 141 142 if timeout > 0 { 143 params := req.URL.Query() 144 params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds())) 145 req.URL.RawQuery = params.Encode() 146 } 147 148 resp, err := c.httpClient.Do(req) 149 if err != nil { 150 return 0, "", err 151 } 152 defer resp.Body.Close() 153 154 body, err := ioutil.ReadAll(resp.Body) 155 if err != nil { 156 return 0, "", err 157 } 158 159 if resp.StatusCode != http.StatusNoContent { 160 var err = fmt.Errorf(string(body)) 161 return 0, "", err 162 } 163 164 version := resp.Header.Get("X-Influxdb-Version") 165 return time.Since(now), version, nil 166} 167 168// Close releases the client's resources. 169func (c *client) Close() error { 170 c.transport.CloseIdleConnections() 171 return nil 172} 173 174// client is safe for concurrent use as the fields are all read-only 175// once the client is instantiated. 176type client struct { 177 // N.B - if url.UserInfo is accessed in future modifications to the 178 // methods on client, you will need to synchronize access to url. 179 url url.URL 180 username string 181 password string 182 useragent string 183 httpClient *http.Client 184 transport *http.Transport 185} 186 187// BatchPoints is an interface into a batched grouping of points to write into 188// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate 189// batch for each goroutine. 190type BatchPoints interface { 191 // AddPoint adds the given point to the Batch of points. 192 AddPoint(p *Point) 193 // AddPoints adds the given points to the Batch of points. 194 AddPoints(ps []*Point) 195 // Points lists the points in the Batch. 196 Points() []*Point 197 198 // Precision returns the currently set precision of this Batch. 199 Precision() string 200 // SetPrecision sets the precision of this batch. 201 SetPrecision(s string) error 202 203 // Database returns the currently set database of this Batch. 204 Database() string 205 // SetDatabase sets the database of this Batch. 206 SetDatabase(s string) 207 208 // WriteConsistency returns the currently set write consistency of this Batch. 209 WriteConsistency() string 210 // SetWriteConsistency sets the write consistency of this Batch. 211 SetWriteConsistency(s string) 212 213 // RetentionPolicy returns the currently set retention policy of this Batch. 214 RetentionPolicy() string 215 // SetRetentionPolicy sets the retention policy of this Batch. 216 SetRetentionPolicy(s string) 217} 218 219// NewBatchPoints returns a BatchPoints interface based on the given config. 220func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) { 221 if conf.Precision == "" { 222 conf.Precision = "ns" 223 } 224 if _, err := time.ParseDuration("1" + conf.Precision); err != nil { 225 return nil, err 226 } 227 bp := &batchpoints{ 228 database: conf.Database, 229 precision: conf.Precision, 230 retentionPolicy: conf.RetentionPolicy, 231 writeConsistency: conf.WriteConsistency, 232 } 233 return bp, nil 234} 235 236type batchpoints struct { 237 points []*Point 238 database string 239 precision string 240 retentionPolicy string 241 writeConsistency string 242} 243 244func (bp *batchpoints) AddPoint(p *Point) { 245 bp.points = append(bp.points, p) 246} 247 248func (bp *batchpoints) AddPoints(ps []*Point) { 249 bp.points = append(bp.points, ps...) 250} 251 252func (bp *batchpoints) Points() []*Point { 253 return bp.points 254} 255 256func (bp *batchpoints) Precision() string { 257 return bp.precision 258} 259 260func (bp *batchpoints) Database() string { 261 return bp.database 262} 263 264func (bp *batchpoints) WriteConsistency() string { 265 return bp.writeConsistency 266} 267 268func (bp *batchpoints) RetentionPolicy() string { 269 return bp.retentionPolicy 270} 271 272func (bp *batchpoints) SetPrecision(p string) error { 273 if _, err := time.ParseDuration("1" + p); err != nil { 274 return err 275 } 276 bp.precision = p 277 return nil 278} 279 280func (bp *batchpoints) SetDatabase(db string) { 281 bp.database = db 282} 283 284func (bp *batchpoints) SetWriteConsistency(wc string) { 285 bp.writeConsistency = wc 286} 287 288func (bp *batchpoints) SetRetentionPolicy(rp string) { 289 bp.retentionPolicy = rp 290} 291 292// Point represents a single data point. 293type Point struct { 294 pt models.Point 295} 296 297// NewPoint returns a point with the given timestamp. If a timestamp is not 298// given, then data is sent to the database without a timestamp, in which case 299// the server will assign local time upon reception. NOTE: it is recommended to 300// send data with a timestamp. 301func NewPoint( 302 name string, 303 tags map[string]string, 304 fields map[string]interface{}, 305 t ...time.Time, 306) (*Point, error) { 307 var T time.Time 308 if len(t) > 0 { 309 T = t[0] 310 } 311 312 pt, err := models.NewPoint(name, models.NewTags(tags), fields, T) 313 if err != nil { 314 return nil, err 315 } 316 return &Point{ 317 pt: pt, 318 }, nil 319} 320 321// String returns a line-protocol string of the Point. 322func (p *Point) String() string { 323 return p.pt.String() 324} 325 326// PrecisionString returns a line-protocol string of the Point, 327// with the timestamp formatted for the given precision. 328func (p *Point) PrecisionString(precision string) string { 329 return p.pt.PrecisionString(precision) 330} 331 332// Name returns the measurement name of the point. 333func (p *Point) Name() string { 334 return string(p.pt.Name()) 335} 336 337// Tags returns the tags associated with the point. 338func (p *Point) Tags() map[string]string { 339 return p.pt.Tags().Map() 340} 341 342// Time return the timestamp for the point. 343func (p *Point) Time() time.Time { 344 return p.pt.Time() 345} 346 347// UnixNano returns timestamp of the point in nanoseconds since Unix epoch. 348func (p *Point) UnixNano() int64 { 349 return p.pt.UnixNano() 350} 351 352// Fields returns the fields for the point. 353func (p *Point) Fields() (map[string]interface{}, error) { 354 return p.pt.Fields() 355} 356 357// NewPointFrom returns a point from the provided models.Point. 358func NewPointFrom(pt models.Point) *Point { 359 return &Point{pt: pt} 360} 361 362func (c *client) Write(bp BatchPoints) error { 363 var b bytes.Buffer 364 365 for _, p := range bp.Points() { 366 if p == nil { 367 continue 368 } 369 if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil { 370 return err 371 } 372 373 if err := b.WriteByte('\n'); err != nil { 374 return err 375 } 376 } 377 378 u := c.url 379 u.Path = path.Join(u.Path, "write") 380 381 req, err := http.NewRequest("POST", u.String(), &b) 382 if err != nil { 383 return err 384 } 385 req.Header.Set("Content-Type", "") 386 req.Header.Set("User-Agent", c.useragent) 387 if c.username != "" { 388 req.SetBasicAuth(c.username, c.password) 389 } 390 391 params := req.URL.Query() 392 params.Set("db", bp.Database()) 393 params.Set("rp", bp.RetentionPolicy()) 394 params.Set("precision", bp.Precision()) 395 params.Set("consistency", bp.WriteConsistency()) 396 req.URL.RawQuery = params.Encode() 397 398 resp, err := c.httpClient.Do(req) 399 if err != nil { 400 return err 401 } 402 defer resp.Body.Close() 403 404 body, err := ioutil.ReadAll(resp.Body) 405 if err != nil { 406 return err 407 } 408 409 if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { 410 var err = fmt.Errorf(string(body)) 411 return err 412 } 413 414 return nil 415} 416 417// Query defines a query to send to the server. 418type Query struct { 419 Command string 420 Database string 421 RetentionPolicy string 422 Precision string 423 Chunked bool 424 ChunkSize int 425 Parameters map[string]interface{} 426} 427 428// NewQuery returns a query object. 429// The database and precision arguments can be empty strings if they are not needed for the query. 430func NewQuery(command, database, precision string) Query { 431 return Query{ 432 Command: command, 433 Database: database, 434 Precision: precision, 435 Parameters: make(map[string]interface{}), 436 } 437} 438 439// NewQueryWithRP returns a query object. 440// The database, retention policy, and precision arguments can be empty strings if they are not needed 441// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater. 442func NewQueryWithRP(command, database, retentionPolicy, precision string) Query { 443 return Query{ 444 Command: command, 445 Database: database, 446 RetentionPolicy: retentionPolicy, 447 Precision: precision, 448 Parameters: make(map[string]interface{}), 449 } 450} 451 452// NewQueryWithParameters returns a query object. 453// The database and precision arguments can be empty strings if they are not needed for the query. 454// parameters is a map of the parameter names used in the command to their values. 455func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query { 456 return Query{ 457 Command: command, 458 Database: database, 459 Precision: precision, 460 Parameters: parameters, 461 } 462} 463 464// Response represents a list of statement results. 465type Response struct { 466 Results []Result 467 Err string `json:"error,omitempty"` 468} 469 470// Error returns the first error from any statement. 471// It returns nil if no errors occurred on any statements. 472func (r *Response) Error() error { 473 if r.Err != "" { 474 return fmt.Errorf(r.Err) 475 } 476 for _, result := range r.Results { 477 if result.Err != "" { 478 return fmt.Errorf(result.Err) 479 } 480 } 481 return nil 482} 483 484// Message represents a user message. 485type Message struct { 486 Level string 487 Text string 488} 489 490// Result represents a resultset returned from a single statement. 491type Result struct { 492 Series []models.Row 493 Messages []*Message 494 Err string `json:"error,omitempty"` 495} 496 497// Query sends a command to the server and returns the Response. 498func (c *client) Query(q Query) (*Response, error) { 499 u := c.url 500 u.Path = path.Join(u.Path, "query") 501 502 jsonParameters, err := json.Marshal(q.Parameters) 503 504 if err != nil { 505 return nil, err 506 } 507 508 req, err := http.NewRequest("POST", u.String(), nil) 509 if err != nil { 510 return nil, err 511 } 512 513 req.Header.Set("Content-Type", "") 514 req.Header.Set("User-Agent", c.useragent) 515 516 if c.username != "" { 517 req.SetBasicAuth(c.username, c.password) 518 } 519 520 params := req.URL.Query() 521 params.Set("q", q.Command) 522 params.Set("db", q.Database) 523 if q.RetentionPolicy != "" { 524 params.Set("rp", q.RetentionPolicy) 525 } 526 params.Set("params", string(jsonParameters)) 527 if q.Chunked { 528 params.Set("chunked", "true") 529 if q.ChunkSize > 0 { 530 params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) 531 } 532 } 533 534 if q.Precision != "" { 535 params.Set("epoch", q.Precision) 536 } 537 req.URL.RawQuery = params.Encode() 538 539 resp, err := c.httpClient.Do(req) 540 if err != nil { 541 return nil, err 542 } 543 defer resp.Body.Close() 544 545 // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb 546 // but instead some other service. If the error code is also a 500+ code, then some 547 // downstream loadbalancer/proxy/etc had an issue and we should report that. 548 if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError { 549 body, err := ioutil.ReadAll(resp.Body) 550 if err != nil || len(body) == 0 { 551 return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode) 552 } 553 554 return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body) 555 } 556 557 // If we get an unexpected content type, then it is also not from influx direct and therefore 558 // we want to know what we received and what status code was returned for debugging purposes. 559 if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" { 560 // Read up to 1kb of the body to help identify downstream errors and limit the impact of things 561 // like downstream serving a large file 562 body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) 563 if err != nil || len(body) == 0 { 564 return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) 565 } 566 567 return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) 568 } 569 570 var response Response 571 if q.Chunked { 572 cr := NewChunkedResponse(resp.Body) 573 for { 574 r, err := cr.NextResponse() 575 if err != nil { 576 // If we got an error while decoding the response, send that back. 577 return nil, err 578 } 579 580 if r == nil { 581 break 582 } 583 584 response.Results = append(response.Results, r.Results...) 585 if r.Err != "" { 586 response.Err = r.Err 587 break 588 } 589 } 590 } else { 591 dec := json.NewDecoder(resp.Body) 592 dec.UseNumber() 593 decErr := dec.Decode(&response) 594 595 // ignore this error if we got an invalid status code 596 if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { 597 decErr = nil 598 } 599 // If we got a valid decode error, send that back 600 if decErr != nil { 601 return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr) 602 } 603 } 604 605 // If we don't have an error in our json response, and didn't get statusOK 606 // then send back an error 607 if resp.StatusCode != http.StatusOK && response.Error() == nil { 608 return &response, fmt.Errorf("received status code %d from server", resp.StatusCode) 609 } 610 return &response, nil 611} 612 613// duplexReader reads responses and writes it to another writer while 614// satisfying the reader interface. 615type duplexReader struct { 616 r io.Reader 617 w io.Writer 618} 619 620func (r *duplexReader) Read(p []byte) (n int, err error) { 621 n, err = r.r.Read(p) 622 if err == nil { 623 r.w.Write(p[:n]) 624 } 625 return n, err 626} 627 628// ChunkedResponse represents a response from the server that 629// uses chunking to stream the output. 630type ChunkedResponse struct { 631 dec *json.Decoder 632 duplex *duplexReader 633 buf bytes.Buffer 634} 635 636// NewChunkedResponse reads a stream and produces responses from the stream. 637func NewChunkedResponse(r io.Reader) *ChunkedResponse { 638 resp := &ChunkedResponse{} 639 resp.duplex = &duplexReader{r: r, w: &resp.buf} 640 resp.dec = json.NewDecoder(resp.duplex) 641 resp.dec.UseNumber() 642 return resp 643} 644 645// NextResponse reads the next line of the stream and returns a response. 646func (r *ChunkedResponse) NextResponse() (*Response, error) { 647 var response Response 648 649 if err := r.dec.Decode(&response); err != nil { 650 if err == io.EOF { 651 return nil, nil 652 } 653 // A decoding error happened. This probably means the server crashed 654 // and sent a last-ditch error message to us. Ensure we have read the 655 // entirety of the connection to get any remaining error text. 656 io.Copy(ioutil.Discard, r.duplex) 657 return nil, errors.New(strings.TrimSpace(r.buf.String())) 658 } 659 660 r.buf.Reset() 661 return &response, nil 662} 663