278 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			278 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Go
		
	
	
	
package influxdb_v2
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"crypto/tls"
 | 
						|
	"encoding/json"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"log"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"net/url"
 | 
						|
	"path"
 | 
						|
	"strconv"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/influxdata/telegraf"
 | 
						|
	"github.com/influxdata/telegraf/internal"
 | 
						|
	"github.com/influxdata/telegraf/plugins/serializers/influx"
 | 
						|
)
 | 
						|
 | 
						|
type APIErrorType int
 | 
						|
 | 
						|
type APIError struct {
 | 
						|
	StatusCode  int
 | 
						|
	Title       string
 | 
						|
	Description string
 | 
						|
	Type        APIErrorType
 | 
						|
}
 | 
						|
 | 
						|
func (e APIError) Error() string {
 | 
						|
	if e.Description != "" {
 | 
						|
		return fmt.Sprintf("%s: %s", e.Title, e.Description)
 | 
						|
	}
 | 
						|
	return e.Title
 | 
						|
}
 | 
						|
 | 
						|
const (
 | 
						|
	defaultRequestTimeout = time.Second * 5
 | 
						|
	defaultMaxWait        = 10 // seconds
 | 
						|
	defaultDatabase       = "telegraf"
 | 
						|
)
 | 
						|
 | 
						|
type HTTPConfig struct {
 | 
						|
	URL             *url.URL
 | 
						|
	Token           string
 | 
						|
	Organization    string
 | 
						|
	Bucket          string
 | 
						|
	Timeout         time.Duration
 | 
						|
	Headers         map[string]string
 | 
						|
	Proxy           *url.URL
 | 
						|
	UserAgent       string
 | 
						|
	ContentEncoding string
 | 
						|
	TLSConfig       *tls.Config
 | 
						|
 | 
						|
	Serializer *influx.Serializer
 | 
						|
}
 | 
						|
 | 
						|
type httpClient struct {
 | 
						|
	WriteURL        string
 | 
						|
	ContentEncoding string
 | 
						|
	Timeout         time.Duration
 | 
						|
	Headers         map[string]string
 | 
						|
 | 
						|
	client     *http.Client
 | 
						|
	serializer *influx.Serializer
 | 
						|
	url        *url.URL
 | 
						|
	retryTime  time.Time
 | 
						|
}
 | 
						|
 | 
						|
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
 | 
						|
	if config.URL == nil {
 | 
						|
		return nil, ErrMissingURL
 | 
						|
	}
 | 
						|
 | 
						|
	timeout := config.Timeout
 | 
						|
	if timeout == 0 {
 | 
						|
		timeout = defaultRequestTimeout
 | 
						|
	}
 | 
						|
 | 
						|
	userAgent := config.UserAgent
 | 
						|
	if userAgent == "" {
 | 
						|
		userAgent = "Telegraf/" + internal.Version()
 | 
						|
	}
 | 
						|
 | 
						|
	var headers = make(map[string]string, len(config.Headers)+2)
 | 
						|
	headers["User-Agent"] = userAgent
 | 
						|
	headers["Authorization"] = "Token " + config.Token
 | 
						|
	for k, v := range config.Headers {
 | 
						|
		headers[k] = v
 | 
						|
	}
 | 
						|
 | 
						|
	var proxy func(*http.Request) (*url.URL, error)
 | 
						|
	if config.Proxy != nil {
 | 
						|
		proxy = http.ProxyURL(config.Proxy)
 | 
						|
	} else {
 | 
						|
		proxy = http.ProxyFromEnvironment
 | 
						|
	}
 | 
						|
 | 
						|
	serializer := config.Serializer
 | 
						|
	if serializer == nil {
 | 
						|
		serializer = influx.NewSerializer()
 | 
						|
	}
 | 
						|
 | 
						|
	writeURL, err := makeWriteURL(
 | 
						|
		*config.URL,
 | 
						|
		config.Organization,
 | 
						|
		config.Bucket)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	var transport *http.Transport
 | 
						|
	switch config.URL.Scheme {
 | 
						|
	case "http", "https":
 | 
						|
		transport = &http.Transport{
 | 
						|
			Proxy:           proxy,
 | 
						|
			TLSClientConfig: config.TLSConfig,
 | 
						|
		}
 | 
						|
	case "unix":
 | 
						|
		transport = &http.Transport{
 | 
						|
			Dial: func(_, _ string) (net.Conn, error) {
 | 
						|
				return net.DialTimeout(
 | 
						|
					config.URL.Scheme,
 | 
						|
					config.URL.Path,
 | 
						|
					timeout,
 | 
						|
				)
 | 
						|
			},
 | 
						|
		}
 | 
						|
	default:
 | 
						|
		return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme)
 | 
						|
	}
 | 
						|
 | 
						|
	client := &httpClient{
 | 
						|
		serializer: serializer,
 | 
						|
		client: &http.Client{
 | 
						|
			Timeout:   timeout,
 | 
						|
			Transport: transport,
 | 
						|
		},
 | 
						|
		url:             config.URL,
 | 
						|
		WriteURL:        writeURL,
 | 
						|
		ContentEncoding: config.ContentEncoding,
 | 
						|
		Timeout:         timeout,
 | 
						|
		Headers:         headers,
 | 
						|
	}
 | 
						|
	return client, nil
 | 
						|
}
 | 
						|
 | 
						|
// URL returns the origin URL that this client connects too.
 | 
						|
func (c *httpClient) URL() string {
 | 
						|
	return c.url.String()
 | 
						|
}
 | 
						|
 | 
						|
type genericRespError struct {
 | 
						|
	Code      string
 | 
						|
	Message   string
 | 
						|
	Line      *int32
 | 
						|
	MaxLength *int32
 | 
						|
}
 | 
						|
 | 
						|
func (g genericRespError) Error() string {
 | 
						|
	errString := fmt.Sprintf("%s: %s", g.Code, g.Message)
 | 
						|
	if g.Line != nil {
 | 
						|
		return fmt.Sprintf("%s - line[%d]", errString, g.Line)
 | 
						|
	} else if g.MaxLength != nil {
 | 
						|
		return fmt.Sprintf("%s - maxlen[%d]", errString, g.MaxLength)
 | 
						|
	}
 | 
						|
	return errString
 | 
						|
}
 | 
						|
 | 
						|
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
 | 
						|
	if c.retryTime.After(time.Now()) {
 | 
						|
		return errors.New("Retry time has not elapsed")
 | 
						|
	}
 | 
						|
	reader := influx.NewReader(metrics, c.serializer)
 | 
						|
	req, err := c.makeWriteRequest(reader)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	resp, err := c.client.Do(req.WithContext(ctx))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer resp.Body.Close()
 | 
						|
 | 
						|
	if resp.StatusCode == http.StatusNoContent {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	writeResp := &genericRespError{}
 | 
						|
	err = json.NewDecoder(resp.Body).Decode(writeResp)
 | 
						|
	desc := writeResp.Error()
 | 
						|
	if err != nil {
 | 
						|
		desc = err.Error()
 | 
						|
	}
 | 
						|
 | 
						|
	switch resp.StatusCode {
 | 
						|
	case http.StatusBadRequest, http.StatusUnauthorized,
 | 
						|
		http.StatusForbidden, http.StatusRequestEntityTooLarge:
 | 
						|
		log.Printf("E! [outputs.influxdb_v2] Failed to write metric: %s\n", desc)
 | 
						|
		return nil
 | 
						|
	case http.StatusTooManyRequests, http.StatusServiceUnavailable:
 | 
						|
		retryAfter := resp.Header.Get("Retry-After")
 | 
						|
		retry, err := strconv.Atoi(retryAfter)
 | 
						|
		if err != nil {
 | 
						|
			retry = 0
 | 
						|
		}
 | 
						|
		if retry > defaultMaxWait {
 | 
						|
			retry = defaultMaxWait
 | 
						|
		}
 | 
						|
		c.retryTime = time.Now().Add(time.Duration(retry) * time.Second)
 | 
						|
		return fmt.Errorf("Waiting %ds for server before sending metric again", retry)
 | 
						|
	}
 | 
						|
 | 
						|
	// This is only until platform spec is fully implemented. As of the
 | 
						|
	// time of writing, there is no error body returned.
 | 
						|
	if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" {
 | 
						|
		desc = fmt.Sprintf("%s; %s", desc, xErr)
 | 
						|
	}
 | 
						|
 | 
						|
	return &APIError{
 | 
						|
		StatusCode:  resp.StatusCode,
 | 
						|
		Title:       resp.Status,
 | 
						|
		Description: desc,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
 | 
						|
	var err error
 | 
						|
	if c.ContentEncoding == "gzip" {
 | 
						|
		body, err = internal.CompressWithGzip(body)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	req, err := http.NewRequest("POST", c.WriteURL, body)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	req.Header.Set("Content-Type", "text/plain; charset=utf-8")
 | 
						|
	c.addHeaders(req)
 | 
						|
 | 
						|
	if c.ContentEncoding == "gzip" {
 | 
						|
		req.Header.Set("Content-Encoding", "gzip")
 | 
						|
	}
 | 
						|
 | 
						|
	return req, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *httpClient) addHeaders(req *http.Request) {
 | 
						|
	for header, value := range c.Headers {
 | 
						|
		req.Header.Set(header, value)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func makeWriteURL(loc url.URL, org, bucket string) (string, error) {
 | 
						|
	params := url.Values{}
 | 
						|
	params.Set("bucket", bucket)
 | 
						|
	params.Set("org", org)
 | 
						|
 | 
						|
	switch loc.Scheme {
 | 
						|
	case "unix":
 | 
						|
		loc.Scheme = "http"
 | 
						|
		loc.Host = "127.0.0.1"
 | 
						|
		loc.Path = "/api/v2/write"
 | 
						|
	case "http", "https":
 | 
						|
		loc.Path = path.Join(loc.Path, "/api/v2/write")
 | 
						|
	default:
 | 
						|
		return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme)
 | 
						|
	}
 | 
						|
	loc.RawQuery = params.Encode()
 | 
						|
	return loc.String(), nil
 | 
						|
}
 |