parent
a7ed46160a
commit
3c7c8926fb
|
@ -1,6 +1,11 @@
|
|||
## v0.1.9 [unreleased]
|
||||
|
||||
### Release Notes
|
||||
- InfluxDB output config change: `url` is now `urls`, and is a list. Config files
|
||||
will still be backwards compatible if only `url` is specified.
|
||||
|
||||
### Features
|
||||
- [#143](https://github.com/influxdb/telegraf/issues/143): InfluxDB clustering support
|
||||
|
||||
### Bugfixes
|
||||
- [#170](https://github.com/influxdb/telegraf/issues/170): Systemd support
|
||||
|
|
3
agent.go
3
agent.go
|
@ -84,6 +84,9 @@ func NewAgent(config *Config) (*Agent, error) {
|
|||
// Connect connects to all configured outputs
|
||||
func (a *Agent) Connect() error {
|
||||
for _, o := range a.outputs {
|
||||
if a.Debug {
|
||||
log.Printf("Attempting connection to output: %s\n", o.name)
|
||||
}
|
||||
err := o.output.Connect()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
[outputs]
|
||||
[outputs.influxdb]
|
||||
# The full HTTP endpoint URL for your InfluxDB instance
|
||||
url = "http://localhost:8086" # required.
|
||||
urls = ["http://localhost:8086"] # required.
|
||||
|
||||
# The target database for metrics. This database must already exist
|
||||
database = "telegraf" # required.
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
|
@ -12,19 +14,23 @@ import (
|
|||
)
|
||||
|
||||
type InfluxDB struct {
|
||||
// URL is only for backwards compatability
|
||||
URL string
|
||||
URLs []string `toml:"urls"`
|
||||
Username string
|
||||
Password string
|
||||
Database string
|
||||
UserAgent string
|
||||
Timeout t.Duration
|
||||
|
||||
conn *client.Client
|
||||
conns []*client.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# The full HTTP endpoint URL for your InfluxDB instance
|
||||
url = "http://localhost:8086" # required.
|
||||
# Multiple urls can be specified for InfluxDB cluster support. Server to
|
||||
# write to will be randomly chosen each interval.
|
||||
urls = ["http://localhost:8086"] # required.
|
||||
|
||||
# The target database for metrics. This database must already exist
|
||||
database = "telegraf" # required.
|
||||
|
@ -42,33 +48,58 @@ var sampleConfig = `
|
|||
`
|
||||
|
||||
func (i *InfluxDB) Connect() error {
|
||||
u, err := url.Parse(i.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
var urls []*url.URL
|
||||
for _, URL := range i.URLs {
|
||||
u, err := url.Parse(URL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
urls = append(urls, u)
|
||||
}
|
||||
|
||||
c, err := client.NewClient(client.Config{
|
||||
URL: *u,
|
||||
Username: i.Username,
|
||||
Password: i.Password,
|
||||
UserAgent: i.UserAgent,
|
||||
Timeout: i.Timeout.Duration,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
// Backward-compatability with single Influx URL config files
|
||||
// This could eventually be removed in favor of specifying the urls as a list
|
||||
if i.URL != "" {
|
||||
u, err := url.Parse(i.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
urls = append(urls, u)
|
||||
}
|
||||
|
||||
_, err = c.Query(client.Query{
|
||||
Command: fmt.Sprintf("CREATE DATABASE %s", i.Database),
|
||||
})
|
||||
|
||||
if err != nil && !strings.Contains(err.Error(), "database already exists") {
|
||||
log.Fatal(err)
|
||||
var conns []*client.Client
|
||||
for _, parsed_url := range urls {
|
||||
c, err := client.NewClient(client.Config{
|
||||
URL: *parsed_url,
|
||||
Username: i.Username,
|
||||
Password: i.Password,
|
||||
UserAgent: i.UserAgent,
|
||||
Timeout: i.Timeout.Duration,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conns = append(conns, c)
|
||||
}
|
||||
|
||||
i.conn = c
|
||||
return nil
|
||||
// This will get set to nil if a successful connection is made
|
||||
err := errors.New("Could not create database on any server")
|
||||
|
||||
for _, conn := range conns {
|
||||
_, e := conn.Query(client.Query{
|
||||
Command: fmt.Sprintf("CREATE DATABASE %s", i.Database),
|
||||
})
|
||||
|
||||
if e != nil && !strings.Contains(e.Error(), "database already exists") {
|
||||
log.Println("ERROR: " + e.Error())
|
||||
} else {
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
i.conns = conns
|
||||
return err
|
||||
}
|
||||
|
||||
func (i *InfluxDB) Close() error {
|
||||
|
@ -84,12 +115,24 @@ func (i *InfluxDB) Description() string {
|
|||
return "Configuration for influxdb server to send metrics to"
|
||||
}
|
||||
|
||||
// Choose a random server in the cluster to write to until a successful write
|
||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||
func (i *InfluxDB) Write(bp client.BatchPoints) error {
|
||||
bp.Database = i.Database
|
||||
if _, err := i.conn.Write(bp); err != nil {
|
||||
return err
|
||||
|
||||
// This will get set to nil if a successful write occurs
|
||||
err := errors.New("Could not write to any InfluxDB server in cluster")
|
||||
|
||||
p := rand.Perm(len(i.conns))
|
||||
for _, n := range p {
|
||||
if _, e := i.conns[n].Write(bp); e != nil {
|
||||
log.Println("ERROR: " + e.Error())
|
||||
} else {
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
Loading…
Reference in New Issue