332 lines
7.5 KiB
Go
332 lines
7.5 KiB
Go
package influxdb_v2
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
|
)
|
|
|
|
type APIError struct {
|
|
StatusCode int
|
|
Title string
|
|
Description string
|
|
}
|
|
|
|
func (e APIError) Error() string {
|
|
if e.Description != "" {
|
|
return fmt.Sprintf("%s: %s", e.Title, e.Description)
|
|
}
|
|
return e.Title
|
|
}
|
|
|
|
const (
|
|
defaultRequestTimeout = time.Second * 5
|
|
defaultMaxWait = 10 // seconds
|
|
defaultDatabase = "telegraf"
|
|
)
|
|
|
|
type HTTPConfig struct {
|
|
URL *url.URL
|
|
Token string
|
|
Organization string
|
|
Bucket string
|
|
BucketTag string
|
|
ExcludeBucketTag bool
|
|
Timeout time.Duration
|
|
Headers map[string]string
|
|
Proxy *url.URL
|
|
UserAgent string
|
|
ContentEncoding string
|
|
TLSConfig *tls.Config
|
|
|
|
Serializer *influx.Serializer
|
|
}
|
|
|
|
type httpClient struct {
|
|
ContentEncoding string
|
|
Timeout time.Duration
|
|
Headers map[string]string
|
|
Organization string
|
|
Bucket string
|
|
BucketTag string
|
|
ExcludeBucketTag bool
|
|
|
|
client *http.Client
|
|
serializer *influx.Serializer
|
|
url *url.URL
|
|
retryTime time.Time
|
|
}
|
|
|
|
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
|
|
if config.URL == nil {
|
|
return nil, ErrMissingURL
|
|
}
|
|
|
|
timeout := config.Timeout
|
|
if timeout == 0 {
|
|
timeout = defaultRequestTimeout
|
|
}
|
|
|
|
userAgent := config.UserAgent
|
|
if userAgent == "" {
|
|
userAgent = "Telegraf/" + internal.Version()
|
|
}
|
|
|
|
var headers = make(map[string]string, len(config.Headers)+2)
|
|
headers["User-Agent"] = userAgent
|
|
headers["Authorization"] = "Token " + config.Token
|
|
for k, v := range config.Headers {
|
|
headers[k] = v
|
|
}
|
|
|
|
var proxy func(*http.Request) (*url.URL, error)
|
|
if config.Proxy != nil {
|
|
proxy = http.ProxyURL(config.Proxy)
|
|
} else {
|
|
proxy = http.ProxyFromEnvironment
|
|
}
|
|
|
|
serializer := config.Serializer
|
|
if serializer == nil {
|
|
serializer = influx.NewSerializer()
|
|
}
|
|
|
|
var transport *http.Transport
|
|
switch config.URL.Scheme {
|
|
case "http", "https":
|
|
transport = &http.Transport{
|
|
Proxy: proxy,
|
|
TLSClientConfig: config.TLSConfig,
|
|
}
|
|
case "unix":
|
|
transport = &http.Transport{
|
|
Dial: func(_, _ string) (net.Conn, error) {
|
|
return net.DialTimeout(
|
|
config.URL.Scheme,
|
|
config.URL.Path,
|
|
timeout,
|
|
)
|
|
},
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme)
|
|
}
|
|
|
|
client := &httpClient{
|
|
serializer: serializer,
|
|
client: &http.Client{
|
|
Timeout: timeout,
|
|
Transport: transport,
|
|
},
|
|
url: config.URL,
|
|
ContentEncoding: config.ContentEncoding,
|
|
Timeout: timeout,
|
|
Headers: headers,
|
|
Organization: config.Organization,
|
|
Bucket: config.Bucket,
|
|
BucketTag: config.BucketTag,
|
|
ExcludeBucketTag: config.ExcludeBucketTag,
|
|
}
|
|
return client, nil
|
|
}
|
|
|
|
// URL returns the origin URL that this client connects too.
|
|
func (c *httpClient) URL() string {
|
|
return c.url.String()
|
|
}
|
|
|
|
type genericRespError struct {
|
|
Code string
|
|
Message string
|
|
Line *int32
|
|
MaxLength *int32
|
|
}
|
|
|
|
func (g genericRespError) Error() string {
|
|
errString := fmt.Sprintf("%s: %s", g.Code, g.Message)
|
|
if g.Line != nil {
|
|
return fmt.Sprintf("%s - line[%d]", errString, g.Line)
|
|
} else if g.MaxLength != nil {
|
|
return fmt.Sprintf("%s - maxlen[%d]", errString, g.MaxLength)
|
|
}
|
|
return errString
|
|
}
|
|
|
|
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
|
|
if c.retryTime.After(time.Now()) {
|
|
return errors.New("Retry time has not elapsed")
|
|
}
|
|
|
|
batches := make(map[string][]telegraf.Metric)
|
|
if c.BucketTag == "" {
|
|
err := c.writeBatch(ctx, c.Bucket, metrics)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
for _, metric := range metrics {
|
|
bucket, ok := metric.GetTag(c.BucketTag)
|
|
if !ok {
|
|
bucket = c.Bucket
|
|
}
|
|
|
|
if _, ok := batches[bucket]; !ok {
|
|
batches[bucket] = make([]telegraf.Metric, 0)
|
|
}
|
|
|
|
if c.ExcludeBucketTag {
|
|
metric.RemoveTag(c.BucketTag)
|
|
}
|
|
|
|
batches[bucket] = append(batches[bucket], metric)
|
|
}
|
|
|
|
for bucket, batch := range batches {
|
|
err := c.writeBatch(ctx, bucket, batch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
|
|
url, err := makeWriteURL(*c.url, c.Organization, bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reader := influx.NewReader(metrics, c.serializer)
|
|
req, err := c.makeWriteRequest(url, reader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp, err := c.client.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode == http.StatusNoContent {
|
|
return nil
|
|
}
|
|
|
|
writeResp := &genericRespError{}
|
|
err = json.NewDecoder(resp.Body).Decode(writeResp)
|
|
desc := writeResp.Error()
|
|
if err != nil {
|
|
desc = resp.Status
|
|
}
|
|
|
|
switch resp.StatusCode {
|
|
case http.StatusBadRequest, http.StatusRequestEntityTooLarge:
|
|
log.Printf("E! [outputs.influxdb_v2] Failed to write metric: %s\n", desc)
|
|
return nil
|
|
case http.StatusUnauthorized, http.StatusForbidden:
|
|
return fmt.Errorf("failed to write metric: %s", desc)
|
|
case http.StatusTooManyRequests:
|
|
retryAfter := resp.Header.Get("Retry-After")
|
|
retry, err := strconv.Atoi(retryAfter)
|
|
if err != nil {
|
|
return errors.New("rate limit exceeded")
|
|
}
|
|
if retry > defaultMaxWait {
|
|
retry = defaultMaxWait
|
|
}
|
|
c.retryTime = time.Now().Add(time.Duration(retry) * time.Second)
|
|
return fmt.Errorf("waiting %ds for server before sending metric again", retry)
|
|
case http.StatusServiceUnavailable:
|
|
retryAfter := resp.Header.Get("Retry-After")
|
|
retry, err := strconv.Atoi(retryAfter)
|
|
if err != nil {
|
|
return errors.New("server responded: service unavailable")
|
|
}
|
|
if retry > defaultMaxWait {
|
|
retry = defaultMaxWait
|
|
}
|
|
c.retryTime = time.Now().Add(time.Duration(retry) * time.Second)
|
|
return fmt.Errorf("waiting %ds for server before sending metric again", retry)
|
|
}
|
|
|
|
// This is only until platform spec is fully implemented. As of the
|
|
// time of writing, there is no error body returned.
|
|
if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" {
|
|
desc = fmt.Sprintf("%s; %s", desc, xErr)
|
|
}
|
|
|
|
return &APIError{
|
|
StatusCode: resp.StatusCode,
|
|
Title: resp.Status,
|
|
Description: desc,
|
|
}
|
|
}
|
|
|
|
func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
|
|
var err error
|
|
if c.ContentEncoding == "gzip" {
|
|
body, err = internal.CompressWithGzip(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", url, body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
|
c.addHeaders(req)
|
|
|
|
if c.ContentEncoding == "gzip" {
|
|
req.Header.Set("Content-Encoding", "gzip")
|
|
}
|
|
|
|
return req, nil
|
|
}
|
|
|
|
func (c *httpClient) addHeaders(req *http.Request) {
|
|
for header, value := range c.Headers {
|
|
req.Header.Set(header, value)
|
|
}
|
|
}
|
|
|
|
func makeWriteURL(loc url.URL, org, bucket string) (string, error) {
|
|
params := url.Values{}
|
|
params.Set("bucket", bucket)
|
|
params.Set("org", org)
|
|
|
|
switch loc.Scheme {
|
|
case "unix":
|
|
loc.Scheme = "http"
|
|
loc.Host = "127.0.0.1"
|
|
loc.Path = "/api/v2/write"
|
|
case "http", "https":
|
|
loc.Path = path.Join(loc.Path, "/api/v2/write")
|
|
default:
|
|
return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme)
|
|
}
|
|
loc.RawQuery = params.Encode()
|
|
return loc.String(), nil
|
|
}
|
|
|
|
func (c *httpClient) Close() {
|
|
internal.CloseIdleConnections(c.client)
|
|
}
|