package influxdb import ( "context" "errors" "fmt" "log" "math/rand" "net/url" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers/influx" ) var ( defaultURL = "http://localhost:8086" ErrMissingURL = errors.New("missing URL") ) type Client interface { Write(context.Context, []telegraf.Metric) error CreateDatabase(ctx context.Context) error URL() string Database() string } // InfluxDB struct is the primary data structure for the plugin type InfluxDB struct { URL string // url deprecated in 0.1.9; use urls URLs []string `toml:"urls"` Username string Password string Database string UserAgent string RetentionPolicy string WriteConsistency string Timeout internal.Duration UDPPayload int `toml:"udp_payload"` HTTPProxy string `toml:"http_proxy"` HTTPHeaders map[string]string `toml:"http_headers"` ContentEncoding string `toml:"content_encoding"` // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to host cert file SSLCert string `toml:"ssl_cert"` // Path to cert key file SSLKey string `toml:"ssl_key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool Precision string // precision deprecated in 1.0; value is ignored clients []Client CreateHTTPClientF func(config *HTTPConfig) (Client, error) CreateUDPClientF func(config *UDPConfig) (Client, error) serializer *influx.Serializer } var sampleConfig = ` ## The full HTTP or UDP URL for your InfluxDB instance. ## ## Multiple URLs can be specified for a single cluster, only ONE of the ## urls will be written to each interval. # urls = ["udp://127.0.0.1:8089"] # urls = ["http://127.0.0.1:8086"] ## The target database for metrics; will be created as needed. # database = "telegraf" ## Name of existing retention policy to write to. Empty string writes to ## the default retention policy. # retention_policy = "" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all" # write_consistency = "any" ## Timeout for HTTP messages. # timeout = "5s" ## HTTP Basic Auth # username = "telegraf" # password = "metricsmetricsmetricsmetrics" ## HTTP User-Agent # user_agent = "telegraf" ## UDP payload size is the maximum packet size to send. # udp_payload = 512 ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" # ssl_key = "/etc/telegraf/key.pem" ## Use SSL but skip chain & host verification # insecure_skip_verify = false ## HTTP Proxy override, if unset values the standard proxy environment ## variables are consulted to determine which proxy, if any, should be used. # http_proxy = "http://corporate.proxy:3128" ## Additional HTTP headers # http_headers = {"X-Special-Header" = "Special-Value"} ## HTTP Content-Encoding for write request body, can be set to "gzip" to ## compress body or "identity" to apply no encoding. # content_encoding = "identity" ` func (i *InfluxDB) Connect() error { ctx := context.Background() urls := make([]string, 0, len(i.URLs)) urls = append(urls, i.URLs...) if i.URL != "" { urls = append(urls, i.URL) } if len(urls) == 0 { urls = append(urls, defaultURL) } i.serializer = influx.NewSerializer() for _, u := range urls { u, err := url.Parse(u) if err != nil { return fmt.Errorf("error parsing url [%s]: %v", u, err) } var proxy *url.URL if len(i.HTTPProxy) > 0 { proxy, err = url.Parse(i.HTTPProxy) if err != nil { return fmt.Errorf("error parsing proxy_url [%s]: %v", proxy, err) } } switch u.Scheme { case "udp", "udp4", "udp6": c, err := i.udpClient(u) if err != nil { return err } i.clients = append(i.clients, c) case "http", "https": c, err := i.httpClient(ctx, u, proxy) if err != nil { return err } i.clients = append(i.clients, c) default: return fmt.Errorf("unsupported scheme [%s]: %q", u, u.Scheme) } } return nil } func (i *InfluxDB) Close() error { return nil } func (i *InfluxDB) Description() string { return "Configuration for sending metrics to InfluxDB" } func (i *InfluxDB) SampleConfig() string { return sampleConfig } // Write sends metrics to one of the configured servers, logging each // unsuccessful. If all servers fail, return an error. func (i *InfluxDB) Write(metrics []telegraf.Metric) error { ctx := context.Background() var err error p := rand.Perm(len(i.clients)) for _, n := range p { client := i.clients[n] err = client.Write(ctx, metrics) if err == nil { return nil } switch apiError := err.(type) { case APIError: if apiError.Type == DatabaseNotFound { err := client.CreateDatabase(ctx) if err != nil { log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate", client.URL(), client.Database()) } } } log.Printf("E! [outputs.influxdb]: when writing to [%s]: %v", client.URL(), err) } return errors.New("could not write any address") } func (i *InfluxDB) udpClient(url *url.URL) (Client, error) { config := &UDPConfig{ URL: url, MaxPayloadSize: i.UDPPayload, Serializer: i.serializer, } c, err := i.CreateUDPClientF(config) if err != nil { return nil, fmt.Errorf("error creating UDP client [%s]: %v", url, err) } return c, nil } func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL) (Client, error) { tlsConfig, err := internal.GetTLSConfig( i.SSLCert, i.SSLKey, i.SSLCA, i.InsecureSkipVerify) if err != nil { return nil, err } config := &HTTPConfig{ URL: url, Timeout: i.Timeout.Duration, TLSConfig: tlsConfig, UserAgent: i.UserAgent, Username: i.Username, Password: i.Password, Proxy: proxy, ContentEncoding: i.ContentEncoding, Headers: i.HTTPHeaders, Database: i.Database, RetentionPolicy: i.RetentionPolicy, Consistency: i.WriteConsistency, Serializer: i.serializer, } c, err := i.CreateHTTPClientF(config) if err != nil { return nil, fmt.Errorf("error creating HTTP client [%s]: %v", url, err) } err = c.CreateDatabase(ctx) if err != nil { if err, ok := err.(APIError); ok { if err.StatusCode == 503 { return c, nil } } log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v", c.URL(), c.Database(), err) } return c, nil } func init() { outputs.Add("influxdb", func() telegraf.Output { return &InfluxDB{ Timeout: internal.Duration{Duration: time.Second * 5}, CreateHTTPClientF: func(config *HTTPConfig) (Client, error) { return NewHTTPClient(config) }, CreateUDPClientF: func(config *UDPConfig) (Client, error) { return NewUDPClient(config) }, } }) }