Use the UDP client for writing to InfluxDB

This commit is contained in:
Cameron Sparr 2015-11-10 14:19:32 -07:00
parent 019585f0db
commit e10394ba3b
1 changed files with 52 additions and 39 deletions

View File

@ -15,21 +15,23 @@ import (
type InfluxDB struct { type InfluxDB struct {
// URL is only for backwards compatability // URL is only for backwards compatability
URL string URL string
URLs []string `toml:"urls"` URLs []string `toml:"urls"`
Username string Username string
Password string Password string
Database string Database string
UserAgent string UserAgent string
Precision string Precision string
Timeout internal.Duration Timeout internal.Duration
UDPPayload int
conns []client.Client conns []client.Client
} }
var sampleConfig = ` var sampleConfig = `
# The full HTTP endpoint URL for your InfluxDB instance # The full HTTP or UDP endpoint URL for your InfluxDB instance
# Multiple urls can be specified for InfluxDB cluster support. # Multiple urls can be specified for InfluxDB cluster support.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required urls = ["http://localhost:8086"] # required
# The target database for metrics (telegraf will create it if not exists) # The target database for metrics (telegraf will create it if not exists)
database = "telegraf" # required database = "telegraf" # required
@ -42,51 +44,62 @@ var sampleConfig = `
# timeout = "5s" # timeout = "5s"
# username = "telegraf" # username = "telegraf"
# password = "metricsmetricsmetricsmetrics" # password = "metricsmetricsmetricsmetrics"
# Set the user agent for the POSTs (can be useful for log differentiation) # Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf" # user_agent = "telegraf"
# Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512
` `
func (i *InfluxDB) Connect() error { func (i *InfluxDB) Connect() error {
var urls []*url.URL var urls []string
for _, URL := range i.URLs { for _, u := range i.URLs {
u, err := url.Parse(URL)
if err != nil {
return err
}
urls = append(urls, u) urls = append(urls, u)
} }
// Backward-compatability with single Influx URL config files // Backward-compatability with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list // This could eventually be removed in favor of specifying the urls as a list
if i.URL != "" { if i.URL != "" {
u, err := url.Parse(i.URL) urls = append(urls, i.URL)
if err != nil {
return err
}
urls = append(urls, u)
} }
var conns []client.Client var conns []client.Client
for _, parsed_url := range urls { for _, u := range urls {
c := client.NewClient(client.Config{ switch {
URL: parsed_url, case strings.HasPrefix(u, "udp"):
Username: i.Username, if i.UDPPayload == 0 {
Password: i.Password, i.UDPPayload = client.UDPPayloadSize
UserAgent: i.UserAgent, }
Timeout: i.Timeout.Duration, c, err := client.NewUDPClient(client.UDPConfig{
}) Addr: parsed_url.Host,
conns = append(conns, c) PayloadSize: i.UDPPayload,
} })
if err != nil {
return err
}
conns = append(conns, c)
default:
// If URL doesn't start with "udp", assume HTTP client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: parsed_url.String(),
Username: i.Username,
Password: i.Password,
UserAgent: i.UserAgent,
Timeout: i.Timeout.Duration,
})
if err != nil {
return err
}
for _, conn := range conns { // Create Database if it doesn't exist
_, e := conn.Query(client.Query{ _, e := c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE %s", i.Database), Command: fmt.Sprintf("CREATE DATABASE %s", i.Database),
}) })
if e != nil && !strings.Contains(e.Error(), "database already exists") { if e != nil && !strings.Contains(e.Error(), "database already exists") {
log.Println("Database creation failed: " + e.Error()) log.Println("Database creation failed: " + e.Error())
} else { }
break
conns = append(conns, c)
} }
} }