449 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			449 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Go
		
	
	
	
| package influxdb
 | |
| 
 | |
| import (
 | |
| 	"compress/gzip"
 | |
| 	"context"
 | |
| 	"crypto/tls"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"log"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"path"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/influxdata/telegraf"
 | |
| 	"github.com/influxdata/telegraf/plugins/serializers/influx"
 | |
| )
 | |
| 
 | |
| type APIErrorType int
 | |
| 
 | |
| const (
 | |
| 	_ APIErrorType = iota
 | |
| 	DatabaseNotFound
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	defaultRequestTimeout = time.Second * 5
 | |
| 	defaultDatabase       = "telegraf"
 | |
| 	defaultUserAgent      = "telegraf"
 | |
| 
 | |
| 	errStringDatabaseNotFound      = "database not found"
 | |
| 	errStringHintedHandoffNotEmpty = "hinted handoff queue not empty"
 | |
| 	errStringPartialWrite          = "partial write"
 | |
| 	errStringPointsBeyondRP        = "points beyond retention policy"
 | |
| 	errStringUnableToParse         = "unable to parse"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 
 | |
| 	// Escape an identifier in InfluxQL.
 | |
| 	escapeIdentifier = strings.NewReplacer(
 | |
| 		"\n", `\n`,
 | |
| 		`\`, `\\`,
 | |
| 		`"`, `\"`,
 | |
| 	)
 | |
| )
 | |
| 
 | |
| // APIError is an error reported by the InfluxDB server
 | |
| type APIError struct {
 | |
| 	StatusCode  int
 | |
| 	Title       string
 | |
| 	Description string
 | |
| 	Type        APIErrorType
 | |
| }
 | |
| 
 | |
| func (e APIError) Error() string {
 | |
| 	if e.Description != "" {
 | |
| 		return fmt.Sprintf("%s: %s", e.Title, e.Description)
 | |
| 	}
 | |
| 	return e.Title
 | |
| }
 | |
| 
 | |
| // QueryResponse is the response body from the /query endpoint
 | |
| type QueryResponse struct {
 | |
| 	Results []QueryResult `json:"results"`
 | |
| }
 | |
| 
 | |
| type QueryResult struct {
 | |
| 	Err string `json:"error,omitempty"`
 | |
| }
 | |
| 
 | |
| func (r QueryResponse) Error() string {
 | |
| 	if len(r.Results) > 0 {
 | |
| 		return r.Results[0].Err
 | |
| 	}
 | |
| 	return ""
 | |
| }
 | |
| 
 | |
| // WriteResponse is the response body from the /write endpoint
 | |
| type WriteResponse struct {
 | |
| 	Err string `json:"error,omitempty"`
 | |
| }
 | |
| 
 | |
| func (r WriteResponse) Error() string {
 | |
| 	return r.Err
 | |
| }
 | |
| 
 | |
| type HTTPConfig struct {
 | |
| 	URL             *url.URL
 | |
| 	UserAgent       string
 | |
| 	Timeout         time.Duration
 | |
| 	Username        string
 | |
| 	Password        string
 | |
| 	TLSConfig       *tls.Config
 | |
| 	Proxy           *url.URL
 | |
| 	Headers         map[string]string
 | |
| 	ContentEncoding string
 | |
| 	Database        string
 | |
| 	RetentionPolicy string
 | |
| 	Consistency     string
 | |
| 
 | |
| 	InfluxUintSupport bool `toml:"influx_uint_support"`
 | |
| 	Serializer        *influx.Serializer
 | |
| }
 | |
| 
 | |
| type httpClient struct {
 | |
| 	WriteURL        string
 | |
| 	QueryURL        string
 | |
| 	ContentEncoding string
 | |
| 	Timeout         time.Duration
 | |
| 	Username        string
 | |
| 	Password        string
 | |
| 	Headers         map[string]string
 | |
| 
 | |
| 	client     *http.Client
 | |
| 	serializer *influx.Serializer
 | |
| 	url        *url.URL
 | |
| 	database   string
 | |
| }
 | |
| 
 | |
| func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
 | |
| 	if config.URL == nil {
 | |
| 		return nil, ErrMissingURL
 | |
| 	}
 | |
| 
 | |
| 	database := config.Database
 | |
| 	if database == "" {
 | |
| 		database = defaultDatabase
 | |
| 	}
 | |
| 
 | |
| 	timeout := config.Timeout
 | |
| 	if timeout == 0 {
 | |
| 		timeout = defaultRequestTimeout
 | |
| 	}
 | |
| 
 | |
| 	userAgent := config.UserAgent
 | |
| 	if userAgent == "" {
 | |
| 		userAgent = defaultUserAgent
 | |
| 	}
 | |
| 
 | |
| 	var headers = make(map[string]string, len(config.Headers)+1)
 | |
| 	headers["User-Agent"] = userAgent
 | |
| 	for k, v := range config.Headers {
 | |
| 		headers[k] = v
 | |
| 	}
 | |
| 
 | |
| 	var proxy func(*http.Request) (*url.URL, error)
 | |
| 	if config.Proxy != nil {
 | |
| 		proxy = http.ProxyURL(config.Proxy)
 | |
| 	} else {
 | |
| 		proxy = http.ProxyFromEnvironment
 | |
| 	}
 | |
| 
 | |
| 	serializer := config.Serializer
 | |
| 	if serializer == nil {
 | |
| 		serializer = influx.NewSerializer()
 | |
| 	}
 | |
| 
 | |
| 	writeURL, err := makeWriteURL(
 | |
| 		config.URL,
 | |
| 		database,
 | |
| 		config.RetentionPolicy,
 | |
| 		config.Consistency)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	queryURL, err := makeQueryURL(config.URL)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var transport *http.Transport
 | |
| 	switch config.URL.Scheme {
 | |
| 	case "http", "https":
 | |
| 		transport = &http.Transport{
 | |
| 			Proxy:           proxy,
 | |
| 			TLSClientConfig: config.TLSConfig,
 | |
| 		}
 | |
| 	case "unix":
 | |
| 		transport = &http.Transport{
 | |
| 			Dial: func(_, _ string) (net.Conn, error) {
 | |
| 				return net.DialTimeout(
 | |
| 					config.URL.Scheme,
 | |
| 					config.URL.Path,
 | |
| 					defaultRequestTimeout,
 | |
| 				)
 | |
| 			},
 | |
| 		}
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme)
 | |
| 	}
 | |
| 
 | |
| 	client := &httpClient{
 | |
| 		serializer: serializer,
 | |
| 		client: &http.Client{
 | |
| 			Timeout:   timeout,
 | |
| 			Transport: transport,
 | |
| 		},
 | |
| 		database:        database,
 | |
| 		url:             config.URL,
 | |
| 		WriteURL:        writeURL,
 | |
| 		QueryURL:        queryURL,
 | |
| 		ContentEncoding: config.ContentEncoding,
 | |
| 		Timeout:         timeout,
 | |
| 		Username:        config.Username,
 | |
| 		Password:        config.Password,
 | |
| 		Headers:         headers,
 | |
| 	}
 | |
| 	return client, nil
 | |
| }
 | |
| 
 | |
| // URL returns the origin URL that this client connects too.
 | |
| func (c *httpClient) URL() string {
 | |
| 	return c.url.String()
 | |
| }
 | |
| 
 | |
| // URL returns the database that this client connects too.
 | |
| func (c *httpClient) Database() string {
 | |
| 	return c.database
 | |
| }
 | |
| 
 | |
| // CreateDatabase attemps to create a new database in the InfluxDB server.
 | |
| // Note that some names are not allowed by the server, notably those with
 | |
| // non-printable characters or slashes.
 | |
| func (c *httpClient) CreateDatabase(ctx context.Context) error {
 | |
| 	query := fmt.Sprintf(`CREATE DATABASE "%s"`,
 | |
| 		escapeIdentifier.Replace(c.database))
 | |
| 
 | |
| 	req, err := c.makeQueryRequest(query)
 | |
| 
 | |
| 	resp, err := c.client.Do(req.WithContext(ctx))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	queryResp := &QueryResponse{}
 | |
| 	dec := json.NewDecoder(resp.Body)
 | |
| 	err = dec.Decode(queryResp)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		if resp.StatusCode == 200 {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		return &APIError{
 | |
| 			StatusCode: resp.StatusCode,
 | |
| 			Title:      resp.Status,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Even with a 200 response there can be an error
 | |
| 	if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return &APIError{
 | |
| 		StatusCode:  resp.StatusCode,
 | |
| 		Title:       resp.Status,
 | |
| 		Description: queryResp.Error(),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Write sends the metrics to InfluxDB
 | |
| func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
 | |
| 	var err error
 | |
| 
 | |
| 	reader := influx.NewReader(metrics, c.serializer)
 | |
| 	req, err := c.makeWriteRequest(reader)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	resp, err := c.client.Do(req.WithContext(ctx))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	if resp.StatusCode == http.StatusNoContent {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	writeResp := &WriteResponse{}
 | |
| 	dec := json.NewDecoder(resp.Body)
 | |
| 
 | |
| 	var desc string
 | |
| 	err = dec.Decode(writeResp)
 | |
| 	if err == nil {
 | |
| 		desc = writeResp.Err
 | |
| 	}
 | |
| 
 | |
| 	if strings.Contains(desc, errStringDatabaseNotFound) {
 | |
| 		return &APIError{
 | |
| 			StatusCode:  resp.StatusCode,
 | |
| 			Title:       resp.Status,
 | |
| 			Description: desc,
 | |
| 			Type:        DatabaseNotFound,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// This "error" is an informational message about the state of the
 | |
| 	// InfluxDB cluster.
 | |
| 	if strings.Contains(desc, errStringHintedHandoffNotEmpty) {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Points beyond retention policy is returned when points are immediately
 | |
| 	// discarded for being older than the retention policy.  Usually this not
 | |
| 	// a cause for concern and we don't want to retry.
 | |
| 	if strings.Contains(desc, errStringPointsBeyondRP) {
 | |
| 		log.Printf("W! [outputs.influxdb]: when writing to [%s]: received error %v",
 | |
| 			c.URL(), desc)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Other partial write errors, such as "field type conflict", are not
 | |
| 	// correctable at this point and so the point is dropped instead of
 | |
| 	// retrying.
 | |
| 	if strings.Contains(desc, errStringPartialWrite) {
 | |
| 		log.Printf("E! [outputs.influxdb]: when writing to [%s]: received error %v; discarding points",
 | |
| 			c.URL(), desc)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// This error indicates a bug in either Telegraf line protocol
 | |
| 	// serialization, retries would not be successful.
 | |
| 	if strings.Contains(desc, errStringUnableToParse) {
 | |
| 		log.Printf("E! [outputs.influxdb]: when writing to [%s]: received error %v; discarding points",
 | |
| 			c.URL(), desc)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return &APIError{
 | |
| 		StatusCode:  resp.StatusCode,
 | |
| 		Title:       resp.Status,
 | |
| 		Description: desc,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
 | |
| 	params := url.Values{}
 | |
| 	params.Set("q", query)
 | |
| 	form := strings.NewReader(params.Encode())
 | |
| 
 | |
| 	req, err := http.NewRequest("POST", c.QueryURL, form)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
 | |
| 	c.addHeaders(req)
 | |
| 
 | |
| 	return req, nil
 | |
| }
 | |
| 
 | |
| func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
 | |
| 	var err error
 | |
| 	if c.ContentEncoding == "gzip" {
 | |
| 		body, err = compressWithGzip(body)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	req, err := http.NewRequest("POST", c.WriteURL, body)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	req.Header.Set("Content-Type", "text/plain; charset=utf-8")
 | |
| 	c.addHeaders(req)
 | |
| 
 | |
| 	if c.ContentEncoding == "gzip" {
 | |
| 		req.Header.Set("Content-Encoding", "gzip")
 | |
| 	}
 | |
| 
 | |
| 	return req, nil
 | |
| }
 | |
| 
 | |
| func compressWithGzip(data io.Reader) (io.Reader, error) {
 | |
| 	pr, pw := io.Pipe()
 | |
| 	gw := gzip.NewWriter(pw)
 | |
| 	var err error
 | |
| 
 | |
| 	go func() {
 | |
| 		_, err = io.Copy(gw, data)
 | |
| 		gw.Close()
 | |
| 		pw.Close()
 | |
| 	}()
 | |
| 
 | |
| 	return pr, err
 | |
| }
 | |
| 
 | |
| func (c *httpClient) addHeaders(req *http.Request) {
 | |
| 	if c.Username != "" || c.Password != "" {
 | |
| 		req.SetBasicAuth(c.Username, c.Password)
 | |
| 	}
 | |
| 
 | |
| 	for header, value := range c.Headers {
 | |
| 		req.Header.Set(header, value)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func makeWriteURL(loc *url.URL, db, rp, consistency string) (string, error) {
 | |
| 	params := url.Values{}
 | |
| 	params.Set("db", db)
 | |
| 
 | |
| 	if rp != "" {
 | |
| 		params.Set("rp", rp)
 | |
| 	}
 | |
| 
 | |
| 	if consistency != "one" && consistency != "" {
 | |
| 		params.Set("consistency", consistency)
 | |
| 	}
 | |
| 
 | |
| 	u := *loc
 | |
| 	switch u.Scheme {
 | |
| 	case "unix":
 | |
| 		u.Scheme = "http"
 | |
| 		u.Host = "127.0.0.1"
 | |
| 		u.Path = "/write"
 | |
| 	case "http", "https":
 | |
| 		u.Path = path.Join(u.Path, "write")
 | |
| 	default:
 | |
| 		return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme)
 | |
| 	}
 | |
| 	u.RawQuery = params.Encode()
 | |
| 	return u.String(), nil
 | |
| }
 | |
| 
 | |
| func makeQueryURL(loc *url.URL) (string, error) {
 | |
| 	u := *loc
 | |
| 	switch u.Scheme {
 | |
| 	case "unix":
 | |
| 		u.Scheme = "http"
 | |
| 		u.Host = "127.0.0.1"
 | |
| 		u.Path = "/query"
 | |
| 	case "http", "https":
 | |
| 		u.Path = path.Join(u.Path, "query")
 | |
| 	default:
 | |
| 		return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme)
 | |
| 	}
 | |
| 	return u.String(), nil
 | |
| }
 |