[WIP] Etcd integration for configuration
This commit is contained in:
parent
8e041420cd
commit
c4d5fda188
1
Godeps
1
Godeps
|
@ -8,6 +8,7 @@ github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1
|
||||||
github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2
|
github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2
|
||||||
github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6
|
github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6
|
||||||
github.com/dancannon/gorethink e7cac92ea2bc52638791a021f212145acfedb1fc
|
github.com/dancannon/gorethink e7cac92ea2bc52638791a021f212145acfedb1fc
|
||||||
|
github.com/coreos/etcd bdee27b19e8601ffd7bd4f0481abe9bbae04bd09
|
||||||
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
|
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
|
||||||
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
|
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
|
||||||
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
|
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
|
||||||
|
|
6
Makefile
6
Makefile
|
@ -71,6 +71,7 @@ endif
|
||||||
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
||||||
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
||||||
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
|
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
|
||||||
|
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379
|
||||||
|
|
||||||
# Run docker containers necessary for CircleCI unit tests
|
# Run docker containers necessary for CircleCI unit tests
|
||||||
docker-run-circle:
|
docker-run-circle:
|
||||||
|
@ -85,11 +86,12 @@ docker-run-circle:
|
||||||
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
||||||
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
||||||
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
|
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
|
||||||
|
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379
|
||||||
|
|
||||||
# Kill all docker containers, ignore errors
|
# Kill all docker containers, ignore errors
|
||||||
docker-kill:
|
docker-kill:
|
||||||
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
|
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd
|
||||||
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
|
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd
|
||||||
|
|
||||||
# Run full unit tests using docker containers (includes setup and teardown)
|
# Run full unit tests using docker containers (includes setup and teardown)
|
||||||
test: vet docker-kill docker-run
|
test: vet docker-kill docker-run
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/agent"
|
"github.com/influxdata/telegraf/agent"
|
||||||
"github.com/influxdata/telegraf/internal/config"
|
"github.com/influxdata/telegraf/internal/config"
|
||||||
|
"github.com/influxdata/telegraf/internal/etcd"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/all"
|
_ "github.com/influxdata/telegraf/plugins/inputs/all"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
@ -22,6 +23,13 @@ var fDebug = flag.Bool("debug", false,
|
||||||
var fQuiet = flag.Bool("quiet", false,
|
var fQuiet = flag.Bool("quiet", false,
|
||||||
"run in quiet mode")
|
"run in quiet mode")
|
||||||
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
|
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
|
||||||
|
var fEtcd = flag.String("etcd", "", "etcd urls where configuration is stored (comma separated)")
|
||||||
|
var fEtcdFolder = flag.String("etcdfolder", "/telegraf", "etcd root folder where configuration is stored")
|
||||||
|
var fEtcdSendConfigDir = flag.String("etcdwriteconfigdir", "", "store the following config dir to etcd")
|
||||||
|
var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd")
|
||||||
|
var fEtcdEraseConfig = flag.Bool("etcderaseconfig", false, "erase all telegraf config in etcd")
|
||||||
|
var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label")
|
||||||
|
var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)")
|
||||||
var fConfig = flag.String("config", "", "configuration file to load")
|
var fConfig = flag.String("config", "", "configuration file to load")
|
||||||
var fConfigDirectory = flag.String("config-directory", "",
|
var fConfigDirectory = flag.String("config-directory", "",
|
||||||
"directory containing additional *.conf files")
|
"directory containing additional *.conf files")
|
||||||
|
@ -62,6 +70,13 @@ The flags are:
|
||||||
-test gather metrics once, print them to stdout, and exit
|
-test gather metrics once, print them to stdout, and exit
|
||||||
-sample-config print out full sample configuration to stdout
|
-sample-config print out full sample configuration to stdout
|
||||||
-config-directory directory containing additional *.conf files
|
-config-directory directory containing additional *.conf files
|
||||||
|
-etcd etcd urls where configuration is stored (comma separated)
|
||||||
|
-etcdfolder etcd folder where configuration is stored and read
|
||||||
|
-etcdwriteconfigdir store the following config dir to etcd
|
||||||
|
-etcdwriteconfig store the following config file to etcd
|
||||||
|
-etcdwritelabel store config file to etcd with this label
|
||||||
|
-etcdreadlabels read config from etcd using labels (comma-separated)
|
||||||
|
-etcderaseconfig erase all telegraf config in etcd
|
||||||
-input-filter filter the input plugins to enable, separator is :
|
-input-filter filter the input plugins to enable, separator is :
|
||||||
-input-list print all the plugins inputs
|
-input-list print all the plugins inputs
|
||||||
-output-filter filter the output plugins to enable, separator is :
|
-output-filter filter the output plugins to enable, separator is :
|
||||||
|
@ -90,18 +105,46 @@ Examples:
|
||||||
`
|
`
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
reload := make(chan bool, 1)
|
// Read flags
|
||||||
reload <- true
|
|
||||||
for <-reload {
|
|
||||||
reload <- false
|
|
||||||
flag.Usage = func() { usageExit(0) }
|
flag.Usage = func() { usageExit(0) }
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
args := flag.Args()
|
args := flag.Args()
|
||||||
|
|
||||||
if flag.NFlag() == 0 && len(args) == 0 {
|
if flag.NFlag() == 0 && len(args) == 0 {
|
||||||
usageExit(0)
|
usageExit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare signals handling
|
||||||
|
reload := make(chan bool, 1)
|
||||||
|
reload <- true
|
||||||
|
shutdown := make(chan struct{})
|
||||||
|
signals := make(chan os.Signal)
|
||||||
|
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
|
||||||
|
|
||||||
|
// Prepare etcd if needed
|
||||||
|
var e *etcd.EtcdClient
|
||||||
|
if *fEtcd != "" {
|
||||||
|
e = etcd.NewEtcdClient(*fEtcd, *fEtcdFolder)
|
||||||
|
if *fEtcdSendConfig == "" && *fEtcdSendLabel == "" && *fEtcdSendConfigDir == "" {
|
||||||
|
go e.LaunchWatcher(shutdown, signals)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle signals
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
sig := <-signals
|
||||||
|
if sig == os.Interrupt {
|
||||||
|
close(shutdown)
|
||||||
|
} else if sig == syscall.SIGHUP {
|
||||||
|
log.Print("Reloading Telegraf config\n")
|
||||||
|
<-reload
|
||||||
|
reload <- true
|
||||||
|
close(shutdown)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Prepare inputs
|
||||||
var inputFilters []string
|
var inputFilters []string
|
||||||
if *fInputFiltersLegacy != "" {
|
if *fInputFiltersLegacy != "" {
|
||||||
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
|
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
|
||||||
|
@ -112,6 +155,7 @@ func main() {
|
||||||
inputFilters = strings.Split(":"+inputFilter+":", ":")
|
inputFilters = strings.Split(":"+inputFilter+":", ":")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare outputs
|
||||||
var outputFilters []string
|
var outputFilters []string
|
||||||
if *fOutputFiltersLegacy != "" {
|
if *fOutputFiltersLegacy != "" {
|
||||||
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
|
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
|
||||||
|
@ -150,17 +194,20 @@ func main() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Print version
|
||||||
if *fVersion {
|
if *fVersion {
|
||||||
v := fmt.Sprintf("Telegraf - Version %s", Version)
|
v := fmt.Sprintf("Telegraf - Version %s", Version)
|
||||||
fmt.Println(v)
|
fmt.Println(v)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Print sample config
|
||||||
if *fSampleConfig {
|
if *fSampleConfig {
|
||||||
config.PrintSampleConfig(inputFilters, outputFilters)
|
config.PrintSampleConfig(inputFilters, outputFilters)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Print usage
|
||||||
if *fUsage != "" {
|
if *fUsage != "" {
|
||||||
if err := config.PrintInputConfig(*fUsage); err != nil {
|
if err := config.PrintInputConfig(*fUsage); err != nil {
|
||||||
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
|
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
|
||||||
|
@ -170,12 +217,72 @@ func main() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for <-reload {
|
||||||
|
// Reset signal handler vars
|
||||||
|
shutdown = make(chan struct{})
|
||||||
|
reload <- false
|
||||||
|
|
||||||
|
// Prepare config
|
||||||
var (
|
var (
|
||||||
c *config.Config
|
c *config.Config
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
if *fConfig != "" {
|
if *fEtcd != "" {
|
||||||
|
c = config.NewConfig()
|
||||||
|
c.OutputFilters = outputFilters
|
||||||
|
c.InputFilters = inputFilters
|
||||||
|
|
||||||
|
if *fEtcdSendConfigDir != "" {
|
||||||
|
// TODO check config format before write it
|
||||||
|
// Erase config in etcd
|
||||||
|
if *fEtcdEraseConfig {
|
||||||
|
err = e.DeleteConfig("")
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("Error erasing Telegraf Etcd Config: %s", err)
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Write config dir to etcd
|
||||||
|
err = c.LoadDirectory(*fEtcdSendConfigDir)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
err = e.WriteConfigDir(*fEtcdSendConfigDir)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
} else if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" {
|
||||||
|
// TODO check config format before write it
|
||||||
|
// Write config to etcd
|
||||||
|
err = c.LoadConfig(*fEtcdSendConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
err = e.WriteLabelConfig(*fEtcdSendLabel, *fEtcdSendConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
} else if *fEtcdEraseConfig {
|
||||||
|
// Erase config in etcd
|
||||||
|
err = e.DeleteConfig("")
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("Error erasing Telegraf Etcd Config: %s", err)
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
// Read config to etcd
|
||||||
|
log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels)
|
||||||
|
c, err = e.ReadConfig(c, *fEtcdReadLabels)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if *fConfig != "" {
|
||||||
|
// Read config from file
|
||||||
c = config.NewConfig()
|
c = config.NewConfig()
|
||||||
c.OutputFilters = outputFilters
|
c.OutputFilters = outputFilters
|
||||||
c.InputFilters = inputFilters
|
c.InputFilters = inputFilters
|
||||||
|
@ -188,6 +295,7 @@ func main() {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read config dir
|
||||||
if *fConfigDirectoryLegacy != "" {
|
if *fConfigDirectoryLegacy != "" {
|
||||||
err = c.LoadDirectory(*fConfigDirectoryLegacy)
|
err = c.LoadDirectory(*fConfigDirectoryLegacy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -195,12 +303,14 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read config dir
|
||||||
if *fConfigDirectory != "" {
|
if *fConfigDirectory != "" {
|
||||||
err = c.LoadDirectory(*fConfigDirectory)
|
err = c.LoadDirectory(*fConfigDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// check config
|
||||||
if len(c.Outputs) == 0 {
|
if len(c.Outputs) == 0 {
|
||||||
log.Fatalf("Error: no outputs found, did you provide a valid config file?")
|
log.Fatalf("Error: no outputs found, did you provide a valid config file?")
|
||||||
}
|
}
|
||||||
|
@ -234,22 +344,6 @@ func main() {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shutdown := make(chan struct{})
|
|
||||||
signals := make(chan os.Signal)
|
|
||||||
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
|
|
||||||
go func() {
|
|
||||||
sig := <-signals
|
|
||||||
if sig == os.Interrupt {
|
|
||||||
close(shutdown)
|
|
||||||
}
|
|
||||||
if sig == syscall.SIGHUP {
|
|
||||||
log.Printf("Reloading Telegraf config\n")
|
|
||||||
<-reload
|
|
||||||
reload <- true
|
|
||||||
close(shutdown)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Printf("Starting Telegraf (version %s)\n", Version)
|
log.Printf("Starting Telegraf (version %s)\n", Version)
|
||||||
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
|
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
|
||||||
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
|
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
|
||||||
|
|
|
@ -240,3 +240,194 @@ Outputs also support the same configurable options as inputs
|
||||||
[outputs.influxdb.tagpass]
|
[outputs.influxdb.tagpass]
|
||||||
cpu = ["cpu0"]
|
cpu = ["cpu0"]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Etcd Configuration
|
||||||
|
|
||||||
|
### Command line parameters
|
||||||
|
|
||||||
|
Telegraf option related to Etcd:
|
||||||
|
```
|
||||||
|
-etcd etcd urls where configuration is stored (comma separated)
|
||||||
|
-etcdfolder etcd folder where configuration is stored and read
|
||||||
|
-etcdwriteconfigdir store the following config dir to etcd
|
||||||
|
-etcdwriteconfig store the following config file to etcd
|
||||||
|
-etcderaseconfig erase all telegraf config in etcd
|
||||||
|
-etcdwritelabel store config file to etcd with this label
|
||||||
|
-etcdreadlabels read config from etcd using labels (comma-separated)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* Main config file can be loaded in Etcd
|
||||||
|
* Each agent try automatically to find its own key in Etcd (/telegraf/hosts/HOSTNAME)
|
||||||
|
* Labels can be configured in config files, so labels could be from Etcd
|
||||||
|
* Etcd config watcher: that reload Telegraf when a change is detected in Etcd.
|
||||||
|
* You can write all configuration of ALL your Telegraf agent in a folder then send it to Etcd.
|
||||||
|
|
||||||
|
### Put simple configuration in Etcd
|
||||||
|
|
||||||
|
|
||||||
|
#### Create main configuration file
|
||||||
|
|
||||||
|
Create a `myconf.conf` file, which will be stored in Etcd, with the following content:
|
||||||
|
|
||||||
|
```
|
||||||
|
[tags]
|
||||||
|
dc = "us-east-1"
|
||||||
|
|
||||||
|
[agent]
|
||||||
|
interval = "10s"
|
||||||
|
round_interval = true
|
||||||
|
flush_interval = "10s"
|
||||||
|
flush_jitter = "0s"
|
||||||
|
debug = false
|
||||||
|
hostname = ""
|
||||||
|
|
||||||
|
|
||||||
|
[[outputs.influxdb]]
|
||||||
|
urls = ["http://localhost:8086"]
|
||||||
|
database = "telegraf"
|
||||||
|
precision = "s"
|
||||||
|
|
||||||
|
|
||||||
|
[[inputs.cpu]]
|
||||||
|
percpu = true
|
||||||
|
totalcpu = true
|
||||||
|
drop = ["cpu_time*"]
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Send the configuration to Etcd
|
||||||
|
|
||||||
|
Send this file to Etcd using the label **mylabel**
|
||||||
|
|
||||||
|
```
|
||||||
|
$ telegraf -etcd http://127.0.0.1:2379 -etcdwritelabel mylabel -etcdwriteconfig myconf.conf
|
||||||
|
2016/03/08 19:50:27 Config written with key /telegraf/labels/mylabel
|
||||||
|
```
|
||||||
|
|
||||||
|
NOTE: By default, configuration is stored in the folder `/telegraf` in Etcd.
|
||||||
|
You can change it using `-etcdfolder` flag
|
||||||
|
|
||||||
|
|
||||||
|
#### Check if your configuration in Etcd
|
||||||
|
|
||||||
|
You can check if data is really written in Etcd with
|
||||||
|
|
||||||
|
```
|
||||||
|
$ etcdctl get /telegraf/labels/mylabel
|
||||||
|
[tags]
|
||||||
|
dc = "us-east-1"
|
||||||
|
|
||||||
|
[agent]
|
||||||
|
interval = "10s"
|
||||||
|
round_interval = true
|
||||||
|
flush_interval = "10s"
|
||||||
|
flush_jitter = "0s"
|
||||||
|
debug = false
|
||||||
|
hostname = ""
|
||||||
|
|
||||||
|
|
||||||
|
[[outputs.influxdb]]
|
||||||
|
urls = ["http://localhost:8086"]
|
||||||
|
database = "telegraf"
|
||||||
|
precision = "s"
|
||||||
|
|
||||||
|
|
||||||
|
[[inputs.cpu]]
|
||||||
|
percpu = true
|
||||||
|
totalcpu = true
|
||||||
|
drop = ["cpu_time*"]
|
||||||
|
```
|
||||||
|
|
||||||
|
####
|
||||||
|
|
||||||
|
Now any telegraf agent can load this config use the label **mylabel**
|
||||||
|
|
||||||
|
```
|
||||||
|
$ telegraf -etcd http://127.0.0.1:2379 -etcdreadlabels mylabel
|
||||||
|
Config read with label mylabel
|
||||||
|
2016/03/08 19:54:52 WARNING: [etcd] 100: Key not found (/telegraf/main) [245]
|
||||||
|
2016/03/08 19:54:52 WARNING: [etcd] 100: Key not found (/telegraf/hosts) [245]
|
||||||
|
2016/03/08 19:54:52 Database creation failed: Get http://localhost:8086/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+telegraf: dial tcp [::1]:8086: getsockopt: connection refused
|
||||||
|
2016/03/08 19:54:52 Starting Telegraf (version 0.10.4.1-49-g07a3a07)
|
||||||
|
2016/03/08 19:54:52 Loaded outputs: influxdb
|
||||||
|
2016/03/08 19:54:52 Loaded inputs: cpu
|
||||||
|
2016/03/08 19:54:52 Tags enabled: dc=us-east-1 host=hostname
|
||||||
|
2016/03/08 19:54:52 Agent Config: Interval:10s, Debug:false, Quiet:false, Hostname:"hostname", Flush Interval:10s
|
||||||
|
```
|
||||||
|
|
||||||
|
NOTE: By default, configuration is read in the folder `/telegraf` in Etcd.
|
||||||
|
You can change it using `-etcdfolder` flag
|
||||||
|
|
||||||
|
|
||||||
|
#### Read configuration from Etcd
|
||||||
|
|
||||||
|
1. `/telegraf/main` key
|
||||||
|
2. `/telegraf/hosts/HOSTNAME` key
|
||||||
|
3. `/telegraf/labels/LABEL1` key
|
||||||
|
4. `/telegraf/labels/LABEL2` key
|
||||||
|
|
||||||
|
(First means less importance)
|
||||||
|
|
||||||
|
### Put configuration folder in Etcd
|
||||||
|
|
||||||
|
#### Create configuration folder structure
|
||||||
|
|
||||||
|
You can set your configuration in a folder like this:
|
||||||
|
|
||||||
|
```
|
||||||
|
testdata/test1
|
||||||
|
├── hosts
|
||||||
|
│ ├── myserver1.conf
|
||||||
|
│ └── myserver2.conf
|
||||||
|
├── labels
|
||||||
|
│ ├── influx.conf
|
||||||
|
│ ├── network2.conf
|
||||||
|
│ └── network.conf
|
||||||
|
└── main.conf
|
||||||
|
```
|
||||||
|
|
||||||
|
The following table how files and folder are stored in Etcd:
|
||||||
|
|
||||||
|
| Files | Etcd | Details
|
||||||
|
|-------------------------|---------------------|-----------------------------------------------------------------------|
|
||||||
|
| | `/telegraf ` | |
|
||||||
|
| `/main.conf` | `├── main ` | Equivalent to main config file load with `-config` option |
|
||||||
|
| | `├── hosts/ ` | |
|
||||||
|
| `/host/myserver1.conf` | `│ ├── myserver1` | Configuration specific for Telegraf agent with hostname **myserver1** |
|
||||||
|
| `/host/myserver2.conf` | `│ └── myserver2` | Configuration specific for Telegraf agent with hostname **myserver2** |
|
||||||
|
| | `└── labels/ ` | |
|
||||||
|
| `/labels/influx.conf` | ` ├── influx ` | Configuration loaded by hosts with etcd label **influx** |
|
||||||
|
| `/labels/network2.conf` | ` ├── network2 ` | Configuration loaded by hosts with etcd label **network2** |
|
||||||
|
| `/labels/network.conf` | ` └── network ` | Configuration loaded by hosts with etcd label **network** |
|
||||||
|
|
||||||
|
|
||||||
|
(An example is available [here](../internal/etcd/testdata/test1))
|
||||||
|
|
||||||
|
#### Send configuration to Etcd
|
||||||
|
|
||||||
|
Then you can send your configuration folder to Etcd:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ telegraf -etcd http://127.0.0.1:2379 -etcdwriteconfigdir internal/etcd/testdata/test1/
|
||||||
|
2016/03/08 20:14:28 Config written with key /telegraf/hosts/myserver1
|
||||||
|
2016/03/08 20:14:28 Config written with key /telegraf/labels/influx
|
||||||
|
2016/03/08 20:14:28 Config written with key /telegraf/labels/network
|
||||||
|
2016/03/08 20:14:28 Config written with key /telegraf/labels/network2
|
||||||
|
2016/03/08 20:14:28 Config written with key /telegraf/main
|
||||||
|
```
|
||||||
|
#### Read configuration from Etcd
|
||||||
|
|
||||||
|
Then you can start all your telegraf agents like this
|
||||||
|
|
||||||
|
```
|
||||||
|
$ telegraf -etcd http://127.0.0.1:2379 -etcdreadlabels=influx,network
|
||||||
|
2016/03/08 20:17:25 Config read from etcd with labels influx,network
|
||||||
|
2016/03/08 20:17:25 Database creation failed: Get http://localhost:8086/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+telegraf: dial tcp [::1]:8086: getsockopt: connection refused
|
||||||
|
2016/03/08 20:17:25 Starting Telegraf (version 0.10.4.1-49-g07a3a07)
|
||||||
|
2016/03/08 20:17:25 Loaded outputs: influxdb
|
||||||
|
2016/03/08 20:17:25 Loaded inputs: net
|
||||||
|
2016/03/08 20:17:25 Tags enabled: dc=us-east-1 host=myserver1
|
||||||
|
2016/03/08 20:17:25 Agent Config: Interval:2s, Debug:false, Quiet:false, Hostname:"myserver1", Flush Interval:10s
|
||||||
|
2016/03/08 20:17:26 Gathered metrics, (2s interval), from 1 inputs in 1.382394ms
|
||||||
|
```
|
||||||
|
|
|
@ -57,6 +57,7 @@ func NewConfig() *Config {
|
||||||
Agent: &AgentConfig{
|
Agent: &AgentConfig{
|
||||||
Interval: internal.Duration{Duration: 10 * time.Second},
|
Interval: internal.Duration{Duration: 10 * time.Second},
|
||||||
RoundInterval: true,
|
RoundInterval: true,
|
||||||
|
Labels: make([]string, 0),
|
||||||
FlushInterval: internal.Duration{Duration: 10 * time.Second},
|
FlushInterval: internal.Duration{Duration: 10 * time.Second},
|
||||||
FlushJitter: internal.Duration{Duration: 5 * time.Second},
|
FlushJitter: internal.Duration{Duration: 5 * time.Second},
|
||||||
},
|
},
|
||||||
|
@ -116,6 +117,8 @@ type AgentConfig struct {
|
||||||
Quiet bool
|
Quiet bool
|
||||||
Hostname string
|
Hostname string
|
||||||
OmitHostname bool
|
OmitHostname bool
|
||||||
|
// Etcd labels
|
||||||
|
Labels []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inputs returns a list of strings of the configured inputs.
|
// Inputs returns a list of strings of the configured inputs.
|
||||||
|
@ -410,40 +413,48 @@ func (c *Config) LoadConfig(path string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error parsing %s, %s", path, err)
|
return fmt.Errorf("Error parsing %s, %s", path, err)
|
||||||
}
|
}
|
||||||
|
err = c.LoadConfigFromTable(tbl)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf(err.Error(), path)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) LoadConfigFromTable(tbl *ast.Table) error {
|
||||||
|
var err error
|
||||||
for name, val := range tbl.Fields {
|
for name, val := range tbl.Fields {
|
||||||
subTable, ok := val.(*ast.Table)
|
subTable, ok := val.(*ast.Table)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("%s: invalid configuration", path)
|
return fmt.Errorf("%%s: invalid configuration")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch name {
|
switch name {
|
||||||
case "agent":
|
case "agent":
|
||||||
if err = config.UnmarshalTable(subTable, c.Agent); err != nil {
|
if err = config.UnmarshalTable(subTable, c.Agent); err != nil {
|
||||||
log.Printf("Could not parse [agent] config\n")
|
log.Printf("Could not parse [agent] config\n")
|
||||||
return fmt.Errorf("Error parsing %s, %s", path, err)
|
return fmt.Errorf("Error parsing %%s, %s", err)
|
||||||
}
|
}
|
||||||
case "global_tags", "tags":
|
case "global_tags", "tags":
|
||||||
if err = config.UnmarshalTable(subTable, c.Tags); err != nil {
|
if err = config.UnmarshalTable(subTable, c.Tags); err != nil {
|
||||||
log.Printf("Could not parse [global_tags] config\n")
|
log.Printf("Could not parse [global_tags] config\n")
|
||||||
return fmt.Errorf("Error parsing %s, %s", path, err)
|
return fmt.Errorf("Error parsing %%s, %s", err)
|
||||||
}
|
}
|
||||||
case "outputs":
|
case "outputs":
|
||||||
for pluginName, pluginVal := range subTable.Fields {
|
for pluginName, pluginVal := range subTable.Fields {
|
||||||
switch pluginSubTable := pluginVal.(type) {
|
switch pluginSubTable := pluginVal.(type) {
|
||||||
case *ast.Table:
|
case *ast.Table:
|
||||||
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
|
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
|
||||||
return fmt.Errorf("Error parsing %s, %s", path, err)
|
return fmt.Errorf("Error parsing %%s, %s", err)
|
||||||
}
|
}
|
||||||
case []*ast.Table:
|
case []*ast.Table:
|
||||||
for _, t := range pluginSubTable {
|
for _, t := range pluginSubTable {
|
||||||
if err = c.addOutput(pluginName, t); err != nil {
|
if err = c.addOutput(pluginName, t); err != nil {
|
||||||
return fmt.Errorf("Error parsing %s, %s", path, err)
|
return fmt.Errorf("Error parsing %%s, %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unsupported config format: %s, file %s",
|
return fmt.Errorf("Unsupported config format: %s, file %%s",
|
||||||
pluginName, path)
|
pluginName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case "inputs", "plugins":
|
case "inputs", "plugins":
|
||||||
|
@ -451,24 +462,24 @@ func (c *Config) LoadConfig(path string) error {
|
||||||
switch pluginSubTable := pluginVal.(type) {
|
switch pluginSubTable := pluginVal.(type) {
|
||||||
case *ast.Table:
|
case *ast.Table:
|
||||||
if err = c.addInput(pluginName, pluginSubTable); err != nil {
|
if err = c.addInput(pluginName, pluginSubTable); err != nil {
|
||||||
return fmt.Errorf("Error parsing %s, %s", path, err)
|
return fmt.Errorf("Error parsing %%s, %s", err)
|
||||||
}
|
}
|
||||||
case []*ast.Table:
|
case []*ast.Table:
|
||||||
for _, t := range pluginSubTable {
|
for _, t := range pluginSubTable {
|
||||||
if err = c.addInput(pluginName, t); err != nil {
|
if err = c.addInput(pluginName, t); err != nil {
|
||||||
return fmt.Errorf("Error parsing %s, %s", path, err)
|
return fmt.Errorf("Error parsing %%s, %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unsupported config format: %s, file %s",
|
return fmt.Errorf("Unsupported config format: %s, file %%s",
|
||||||
pluginName, path)
|
pluginName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Assume it's an input input for legacy config file support if no other
|
// Assume it's an input input for legacy config file support if no other
|
||||||
// identifiers are present
|
// identifiers are present
|
||||||
default:
|
default:
|
||||||
if err = c.addInput(name, subTable); err != nil {
|
if err = c.addInput(name, subTable); err != nil {
|
||||||
return fmt.Errorf("Error parsing %s, %s", path, err)
|
return fmt.Errorf("Error parsing %%s, %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,262 @@
|
||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/client"
|
||||||
|
influxconfig "github.com/influxdata/config"
|
||||||
|
"github.com/influxdata/telegraf/internal/config"
|
||||||
|
"github.com/influxdata/toml"
|
||||||
|
"github.com/influxdata/toml/ast"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EtcdClient struct {
|
||||||
|
Kapi client.KeysAPI
|
||||||
|
Folder string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EtcdClient) LaunchWatcher(shutdown chan struct{}, signals chan os.Signal) {
|
||||||
|
// TODO: All telegraf client will reload for each changes...
|
||||||
|
// Maybe we want to reload on those we need to ???
|
||||||
|
// So we need to create a watcher by labels ??
|
||||||
|
for {
|
||||||
|
watcherOpts := client.WatcherOptions{AfterIndex: 0, Recursive: true}
|
||||||
|
w := e.Kapi.Watcher(e.Folder, &watcherOpts)
|
||||||
|
r, err := w.Next(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
// TODO What we have to do here ????
|
||||||
|
log.Fatal("Error occurred", err)
|
||||||
|
}
|
||||||
|
if r.Action == "set" || r.Action == "update" {
|
||||||
|
// do something with Response r here
|
||||||
|
log.Printf("Changes detected in etcd (%s action detected)\n", r.Action)
|
||||||
|
log.Print("Reloading Telegraf config\n")
|
||||||
|
signals <- syscall.SIGHUP
|
||||||
|
time.Sleep(time.Duration(1) * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEtcdClient(urls string, folder string) *EtcdClient {
|
||||||
|
splittedUrls := strings.Split(urls, ",")
|
||||||
|
// Create a new etcd client
|
||||||
|
cfg := client.Config{
|
||||||
|
Endpoints: splittedUrls,
|
||||||
|
Transport: client.DefaultTransport,
|
||||||
|
}
|
||||||
|
|
||||||
|
e := &EtcdClient{}
|
||||||
|
c, err := client.New(cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
kapi := client.NewKeysAPI(c)
|
||||||
|
|
||||||
|
e.Kapi = kapi
|
||||||
|
e.Folder = folder
|
||||||
|
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EtcdClient) WriteConfigDir(configdir string) error {
|
||||||
|
directoryEntries, err := ioutil.ReadDir(configdir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, entry := range directoryEntries {
|
||||||
|
name := entry.Name()
|
||||||
|
if entry.IsDir() {
|
||||||
|
if name == "labels" {
|
||||||
|
// Handle labels
|
||||||
|
directoryEntries, err := ioutil.ReadDir(path.Join(configdir, name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, entry := range directoryEntries {
|
||||||
|
filename := entry.Name()
|
||||||
|
if len(filename) < 6 || filename[len(filename)-5:] != ".conf" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
label := filename[:len(filename)-5]
|
||||||
|
err = e.WriteLabelConfig(label, path.Join(configdir, name, filename))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if name == "hosts" {
|
||||||
|
// Handle hosts specific config
|
||||||
|
directoryEntries, err := ioutil.ReadDir(path.Join(configdir, name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, entry := range directoryEntries {
|
||||||
|
filename := entry.Name()
|
||||||
|
if len(filename) < 6 || filename[len(filename)-5:] != ".conf" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
hostname := filename[:len(filename)-5]
|
||||||
|
err = e.WriteHostConfig(hostname, path.Join(configdir, name, filename))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if name == "main.conf" {
|
||||||
|
// Handle main config
|
||||||
|
err := e.WriteMainConfig(path.Join(configdir, name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EtcdClient) DeleteConfig(path string) error {
|
||||||
|
// removeWrite main config file in etcd
|
||||||
|
key := e.Folder + "/" + path
|
||||||
|
_, err := e.Kapi.Delete(context.Background(), key, &client.DeleteOptions{Recursive: true})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EtcdClient) WriteMainConfig(path string) error {
|
||||||
|
// Write main config file in etcd
|
||||||
|
key := "main"
|
||||||
|
err := e.WriteConfig(key, path)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EtcdClient) WriteLabelConfig(label string, path string) error {
|
||||||
|
// Write label config file in etcd
|
||||||
|
key := "labels/" + label
|
||||||
|
err := e.WriteConfig(key, path)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EtcdClient) WriteHostConfig(host string, path string) error {
|
||||||
|
// Write host config file in etcd
|
||||||
|
key := "hosts/" + host
|
||||||
|
err := e.WriteConfig(key, path)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EtcdClient) WriteConfig(relative_key string, path string) error {
|
||||||
|
// Read config file, get conf in tomlformat, convert to json
|
||||||
|
// Then write it to etcd
|
||||||
|
// Read file
|
||||||
|
tbl, err := influxconfig.ParseFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Get toml
|
||||||
|
toml_data := tbl.Source()
|
||||||
|
// Write it
|
||||||
|
key := e.Folder + "/" + relative_key
|
||||||
|
resp, _ := e.Kapi.Get(context.Background(), key, nil)
|
||||||
|
if resp == nil {
|
||||||
|
_, err = e.Kapi.Set(context.Background(), key, string(toml_data), nil)
|
||||||
|
} else {
|
||||||
|
_, err = e.Kapi.Update(context.Background(), key, string(toml_data))
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
log.Printf("Config written with key %s\n", key)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//func (e *EtcdClient) ReadConfig(labels []string) (*config.Config, error) {
|
||||||
|
func (e *EtcdClient) ReadConfig(c *config.Config, labels string) (*config.Config, error) {
|
||||||
|
// Get default config in etcd
|
||||||
|
// key = /telegraf/default
|
||||||
|
key := e.Folder + "/main"
|
||||||
|
resp, err := e.Kapi.Get(context.Background(), key, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("WARNING: [etcd] %s", err)
|
||||||
|
} else {
|
||||||
|
// Put it in toml
|
||||||
|
tbl, err := toml2table(resp)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("WARNING: [etcd] %s", err)
|
||||||
|
}
|
||||||
|
c.LoadConfigFromTable(tbl)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err, "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get specific host config
|
||||||
|
// key = /telegraf/hosts/HOSTNAME
|
||||||
|
hostname, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("WARNING: [etcd] %s", err)
|
||||||
|
} else if hostname != "" {
|
||||||
|
key = e.Folder + "/hosts/" + hostname
|
||||||
|
resp, err := e.Kapi.Get(context.Background(), key, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("WARNING: [etcd] %s", err)
|
||||||
|
} else {
|
||||||
|
// Put it in toml
|
||||||
|
tbl, err := toml2table(resp)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
}
|
||||||
|
c.LoadConfigFromTable(tbl)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err, "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Concat labels from etcd and labels from command line
|
||||||
|
labels_list := c.Agent.Labels
|
||||||
|
if labels != "" {
|
||||||
|
labels_list = append(labels_list, strings.Split(labels, ",")...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate on all labels
|
||||||
|
// TODO check label order of importance ?
|
||||||
|
for _, label := range labels_list {
|
||||||
|
// Read from etcd
|
||||||
|
// key = /telegraf/labels/LABEL
|
||||||
|
key := e.Folder + "/labels/" + label
|
||||||
|
resp, err := e.Kapi.Get(context.Background(), key, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Put it in toml
|
||||||
|
tbl, err := toml2table(resp)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Load config
|
||||||
|
err = c.LoadConfigFromTable(tbl)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err, "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func toml2table(resp *client.Response) (*ast.Table, error) {
|
||||||
|
// Convert json to toml
|
||||||
|
return toml.Parse([]byte(resp.Node.Value))
|
||||||
|
}
|
|
@ -0,0 +1,195 @@
|
||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/internal/config"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs/system"
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
|
||||||
|
|
||||||
|
eclient "github.com/coreos/etcd/client"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test1Write(t *testing.T) {
|
||||||
|
// Delete hostname conf file
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
os.Remove("./testdata/test1/hosts/" + hostname + ".conf")
|
||||||
|
// Get etcd client
|
||||||
|
e := NewEtcdClient("http://localhost:2379", "/telegraf")
|
||||||
|
// Delete old conf from etcd
|
||||||
|
delOptions := &eclient.DeleteOptions{
|
||||||
|
Recursive: true,
|
||||||
|
Dir: true,
|
||||||
|
}
|
||||||
|
e.Kapi.Delete(context.Background(), "/telegraf", delOptions)
|
||||||
|
|
||||||
|
// Test write dir
|
||||||
|
err := e.WriteConfigDir("./testdata/test1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp, _ := e.Kapi.Get(context.Background(), "/telegraf/main", nil)
|
||||||
|
assert.Equal(t,
|
||||||
|
"[tags]\n dc = \"us-east-1\"\n\n[agent]\n interval = \"2s\"\n round_interval = true\n flush_interval = \"10s\"\n flush_jitter = \"0s\"\n debug = false\n hostname = \"\"\n",
|
||||||
|
resp.Node.Value)
|
||||||
|
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/hosts/localhost", nil)
|
||||||
|
assert.Equal(t,
|
||||||
|
"\n[agent]\n interval = \"2s\"\n labels = [\"influx\"]\n\n[[inputs.cpu]]\n percpu = true\n totalcpu = true\n drop = [\"cpu_time*\"]\n",
|
||||||
|
resp.Node.Value)
|
||||||
|
|
||||||
|
// Test write conf
|
||||||
|
err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network.conf")
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil)
|
||||||
|
assert.Equal(t,
|
||||||
|
"[[inputs.net]]\n\n",
|
||||||
|
resp.Node.Value)
|
||||||
|
|
||||||
|
// Test read
|
||||||
|
c := config.NewConfig()
|
||||||
|
var inputFilters []string
|
||||||
|
var outputFilters []string
|
||||||
|
c.OutputFilters = outputFilters
|
||||||
|
c.InputFilters = inputFilters
|
||||||
|
|
||||||
|
net := inputs.Inputs["net"]().(*system.NetIOStats)
|
||||||
|
influx := outputs.Outputs["influxdb"]().(*influxdb.InfluxDB)
|
||||||
|
influx.URLs = []string{"http://localhost:8086"}
|
||||||
|
influx.Database = "telegraf"
|
||||||
|
influx.Precision = "s"
|
||||||
|
|
||||||
|
c, err = e.ReadConfig(c, "mylabel,influx")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, net, c.Inputs[0].Input,
|
||||||
|
"Testdata did not produce a correct net struct.")
|
||||||
|
assert.Equal(t, influx, c.Outputs[0].Output,
|
||||||
|
"Testdata did not produce a correct influxdb struct.")
|
||||||
|
|
||||||
|
// Test reload
|
||||||
|
shutdown := make(chan struct{})
|
||||||
|
signals := make(chan os.Signal)
|
||||||
|
go e.LaunchWatcher(shutdown, signals)
|
||||||
|
// Test write conf
|
||||||
|
err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network2.conf")
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil)
|
||||||
|
assert.Equal(t,
|
||||||
|
"[[inputs.net]]\n\n interfaces = [\"eth0\"]\n\n",
|
||||||
|
resp.Node.Value)
|
||||||
|
// TODO found a way to test reload ....
|
||||||
|
sig := <-signals
|
||||||
|
assert.Equal(t, syscall.SIGHUP, sig)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test2Error(t *testing.T) {
|
||||||
|
e := NewEtcdClient("http://localhost:2379", "/telegraf")
|
||||||
|
|
||||||
|
// Test write dir
|
||||||
|
err := e.WriteConfigDir("./testdata/test2")
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test3Write(t *testing.T) {
|
||||||
|
// Delete old hostname conf file
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
os.Remove("./testdata/test1/hosts/" + hostname + ".conf")
|
||||||
|
// Write host file
|
||||||
|
if hostname != "" {
|
||||||
|
r, err := os.Open("./testdata/test1/hosts/localhost.conf")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
w, err := os.Create("./testdata/test1/hosts/" + hostname + ".conf")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
// do the actual work
|
||||||
|
_, err = io.Copy(w, r)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Get tcd client
|
||||||
|
e := NewEtcdClient("http://localhost:2379", "/telegraf")
|
||||||
|
// Delete old conf from etcd
|
||||||
|
delOptions := &eclient.DeleteOptions{
|
||||||
|
Recursive: true,
|
||||||
|
Dir: true,
|
||||||
|
}
|
||||||
|
e.Kapi.Delete(context.Background(), "/telegraf", delOptions)
|
||||||
|
|
||||||
|
// Test write dir
|
||||||
|
err := e.WriteConfigDir("./testdata/test1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp, _ := e.Kapi.Get(context.Background(), "/telegraf/main", nil)
|
||||||
|
assert.Equal(t,
|
||||||
|
"[tags]\n dc = \"us-east-1\"\n\n[agent]\n interval = \"2s\"\n round_interval = true\n flush_interval = \"10s\"\n flush_jitter = \"0s\"\n debug = false\n hostname = \"\"\n",
|
||||||
|
resp.Node.Value)
|
||||||
|
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/hosts/localhost", nil)
|
||||||
|
assert.Equal(t,
|
||||||
|
"\n[agent]\n interval = \"2s\"\n labels = [\"influx\"]\n\n[[inputs.cpu]]\n percpu = true\n totalcpu = true\n drop = [\"cpu_time*\"]\n",
|
||||||
|
resp.Node.Value)
|
||||||
|
|
||||||
|
// Test write conf
|
||||||
|
err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network.conf")
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil)
|
||||||
|
assert.Equal(t,
|
||||||
|
"[[inputs.net]]\n\n",
|
||||||
|
resp.Node.Value)
|
||||||
|
|
||||||
|
// Test read
|
||||||
|
c := config.NewConfig()
|
||||||
|
var inputFilters []string
|
||||||
|
var outputFilters []string
|
||||||
|
c.OutputFilters = outputFilters
|
||||||
|
c.InputFilters = inputFilters
|
||||||
|
|
||||||
|
cpu := inputs.Inputs["cpu"]().(*system.CPUStats)
|
||||||
|
cpu.PerCPU = true
|
||||||
|
cpu.TotalCPU = true
|
||||||
|
net := inputs.Inputs["net"]().(*system.NetIOStats)
|
||||||
|
influx := outputs.Outputs["influxdb"]().(*influxdb.InfluxDB)
|
||||||
|
influx.URLs = []string{"http://localhost:8086"}
|
||||||
|
influx.Database = "telegraf"
|
||||||
|
influx.Precision = "s"
|
||||||
|
|
||||||
|
c, err = e.ReadConfig(c, "mylabel,influx")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, cpu, c.Inputs[0].Input,
|
||||||
|
"Testdata did not produce a correct net struct.")
|
||||||
|
assert.Equal(t, net, c.Inputs[1].Input,
|
||||||
|
"Testdata did not produce a correct net struct.")
|
||||||
|
assert.Equal(t, influx, c.Outputs[0].Output,
|
||||||
|
"Testdata did not produce a correct influxdb struct.")
|
||||||
|
|
||||||
|
// Test reload
|
||||||
|
shutdown := make(chan struct{})
|
||||||
|
signals := make(chan os.Signal)
|
||||||
|
go e.LaunchWatcher(shutdown, signals)
|
||||||
|
// Test write conf
|
||||||
|
err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network2.conf")
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil)
|
||||||
|
assert.Equal(t,
|
||||||
|
"[[inputs.net]]\n\n interfaces = [\"eth0\"]\n\n",
|
||||||
|
resp.Node.Value)
|
||||||
|
// TODO found a way to test reload ....
|
||||||
|
sig := <-signals
|
||||||
|
assert.Equal(t, syscall.SIGHUP, sig)
|
||||||
|
// Delete hostname conf file
|
||||||
|
os.Remove("./testdata/test1/hosts/" + hostname + ".conf")
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
|
||||||
|
[agent]
|
||||||
|
interval = "2s"
|
||||||
|
labels = ["influx"]
|
||||||
|
|
||||||
|
[[inputs.cpu]]
|
||||||
|
percpu = true
|
||||||
|
totalcpu = true
|
||||||
|
drop = ["cpu_time*"]
|
|
@ -0,0 +1,4 @@
|
||||||
|
[[outputs.influxdb]]
|
||||||
|
urls = ["http://localhost:8086"]
|
||||||
|
database = "telegraf"
|
||||||
|
precision = "s"
|
|
@ -0,0 +1,2 @@
|
||||||
|
[[inputs.net]]
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
[[inputs.net]]
|
||||||
|
|
||||||
|
interfaces = ["eth0"]
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
[tags]
|
||||||
|
dc = "us-east-1"
|
||||||
|
|
||||||
|
[agent]
|
||||||
|
interval = "2s"
|
||||||
|
round_interval = true
|
||||||
|
flush_interval = "10s"
|
||||||
|
flush_jitter = "0s"
|
||||||
|
debug = false
|
||||||
|
hostname = ""
|
|
@ -0,0 +1,12 @@
|
||||||
|
[tags]
|
||||||
|
dc = "us-east-1"
|
||||||
|
|
||||||
|
[agent]
|
||||||
|
interval = "2s"
|
||||||
|
round_interval = true
|
||||||
|
flush_interval = "10s"
|
||||||
|
flush_jitter = "0s"
|
||||||
|
debug = false
|
||||||
|
hostname = ""
|
||||||
|
|
||||||
|
sdag: sdg
|
Loading…
Reference in New Issue