[WIP] Etcd integration for configuration
This commit is contained in:
parent
35150caea4
commit
d44727e2de
2
Godeps
2
Godeps
|
@ -1,4 +1,5 @@
|
|||
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034
|
||||
github.com/BurntSushi/toml 5c4df71dfe9ac89ef6287afc05e4c1b16ae65a1e
|
||||
github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef
|
||||
github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252
|
||||
github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339
|
||||
|
@ -8,6 +9,7 @@ github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d
|
|||
github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873
|
||||
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
|
||||
github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70
|
||||
github.com/coreos/etcd bdee27b19e8601ffd7bd4f0481abe9bbae04bd09
|
||||
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
|
||||
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
|
||||
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
|
||||
|
|
11
Makefile
11
Makefile
|
@ -83,6 +83,7 @@ endif
|
|||
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
||||
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
||||
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
|
||||
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379
|
||||
|
||||
# Run docker containers necessary for CircleCI unit tests
|
||||
docker-run-circle:
|
||||
|
@ -97,11 +98,17 @@ docker-run-circle:
|
|||
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
||||
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
||||
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
|
||||
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379
|
||||
|
||||
# Kill all docker containers, ignore errors
|
||||
docker-kill:
|
||||
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
|
||||
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
|
||||
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd
|
||||
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd
|
||||
|
||||
# Kill all docker containers, ignore errors
|
||||
docker-kill:
|
||||
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann etcd
|
||||
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann etcd
|
||||
|
||||
# Run full unit tests using docker containers (includes setup and teardown)
|
||||
test: docker-kill docker-run
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf/agent"
|
||||
"github.com/influxdata/telegraf/internal/config"
|
||||
"github.com/influxdata/telegraf/internal/etcd"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/all"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/all"
|
||||
)
|
||||
|
@ -20,6 +21,11 @@ var fDebug = flag.Bool("debug", false,
|
|||
var fQuiet = flag.Bool("quiet", false,
|
||||
"run in quiet mode")
|
||||
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
|
||||
var fEtcd = flag.String("etcd", "", "etcd url where configuration is stored")
|
||||
var fEtcdSendConfigDir = flag.String("etcdwriteconfigdir", "", "store the following config dir to etcd")
|
||||
var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd")
|
||||
var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label")
|
||||
var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)")
|
||||
var fConfig = flag.String("config", "", "configuration file to load")
|
||||
var fConfigDirectory = flag.String("config-directory", "",
|
||||
"directory containing additional *.conf files")
|
||||
|
@ -53,16 +59,20 @@ Usage:
|
|||
|
||||
The flags are:
|
||||
|
||||
-config <file> configuration file to load
|
||||
-test gather metrics once, print them to stdout, and exit
|
||||
-sample-config print out full sample configuration to stdout
|
||||
-config-directory directory containing additional *.conf files
|
||||
-input-filter filter the input plugins to enable, separator is :
|
||||
-output-filter filter the output plugins to enable, separator is :
|
||||
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
|
||||
-debug print metrics as they're generated to stdout
|
||||
-quiet run in quiet mode
|
||||
-version print the version to stdout
|
||||
-config <file> configuration file to load
|
||||
-test gather metrics once, print them to stdout, and exit
|
||||
-sample-config print out full sample configuration to stdout
|
||||
-config-directory directory containing additional *.conf files
|
||||
-etcdwriteconfigdir store the following config dir to etcd
|
||||
-etcdwriteconfig store the following config file to etcd
|
||||
-etcdwritelabel store config file to etcd with this label
|
||||
-etcdreadlabels read config from etcd using labels (comma-separated)
|
||||
-input-filter filter the input plugins to enable, separator is :
|
||||
-output-filter filter the output plugins to enable, separator is :
|
||||
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
|
||||
-debug print metrics as they're generated to stdout
|
||||
-quiet run in quiet mode
|
||||
-version print the version to stdout
|
||||
|
||||
Examples:
|
||||
|
||||
|
@ -83,63 +93,138 @@ Examples:
|
|||
`
|
||||
|
||||
func main() {
|
||||
// Read flags
|
||||
flag.Usage = func() { usageExit(0) }
|
||||
flag.Parse()
|
||||
if flag.NFlag() == 0 {
|
||||
usageExit(0)
|
||||
}
|
||||
|
||||
// Prepare signals handling
|
||||
reload := make(chan bool, 1)
|
||||
reload <- true
|
||||
for <-reload {
|
||||
reload <- false
|
||||
flag.Usage = func() { usageExit(0) }
|
||||
flag.Parse()
|
||||
shutdown := make(chan struct{})
|
||||
signals := make(chan os.Signal)
|
||||
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
|
||||
|
||||
if flag.NFlag() == 0 {
|
||||
usageExit(0)
|
||||
// Prepare etcd if needed
|
||||
var e *etcd.EtcdClient
|
||||
if *fEtcd != "" {
|
||||
e = etcd.NewEtcdClient(*fEtcd, "/telegraf")
|
||||
if *fEtcdSendConfig == "" && *fEtcdSendLabel == "" && *fEtcdSendConfigDir == "" {
|
||||
go e.LaunchWatcher(shutdown, signals)
|
||||
}
|
||||
}
|
||||
|
||||
var inputFilters []string
|
||||
if *fInputFiltersLegacy != "" {
|
||||
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
|
||||
inputFilters = strings.Split(":"+inputFilter+":", ":")
|
||||
}
|
||||
if *fInputFilters != "" {
|
||||
inputFilter := strings.TrimSpace(*fInputFilters)
|
||||
inputFilters = strings.Split(":"+inputFilter+":", ":")
|
||||
}
|
||||
|
||||
var outputFilters []string
|
||||
if *fOutputFiltersLegacy != "" {
|
||||
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
|
||||
outputFilters = strings.Split(":"+outputFilter+":", ":")
|
||||
}
|
||||
if *fOutputFilters != "" {
|
||||
outputFilter := strings.TrimSpace(*fOutputFilters)
|
||||
outputFilters = strings.Split(":"+outputFilter+":", ":")
|
||||
}
|
||||
|
||||
if *fVersion {
|
||||
v := fmt.Sprintf("Telegraf - Version %s", Version)
|
||||
fmt.Println(v)
|
||||
return
|
||||
}
|
||||
|
||||
if *fSampleConfig {
|
||||
config.PrintSampleConfig(inputFilters, outputFilters)
|
||||
return
|
||||
}
|
||||
|
||||
if *fUsage != "" {
|
||||
if err := config.PrintInputConfig(*fUsage); err != nil {
|
||||
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
|
||||
log.Fatalf("%s and %s", err, err2)
|
||||
}
|
||||
// Handle signals
|
||||
go func() {
|
||||
for {
|
||||
sig := <-signals
|
||||
if sig == os.Interrupt {
|
||||
close(shutdown)
|
||||
} else if sig == syscall.SIGHUP {
|
||||
log.Print("Reloading Telegraf config\n")
|
||||
<-reload
|
||||
reload <- true
|
||||
close(shutdown)
|
||||
}
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Prepare inputs
|
||||
var inputFilters []string
|
||||
if *fInputFiltersLegacy != "" {
|
||||
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
|
||||
inputFilters = strings.Split(":"+inputFilter+":", ":")
|
||||
}
|
||||
if *fInputFilters != "" {
|
||||
inputFilter := strings.TrimSpace(*fInputFilters)
|
||||
inputFilters = strings.Split(":"+inputFilter+":", ":")
|
||||
}
|
||||
|
||||
// Prepare outputs
|
||||
var outputFilters []string
|
||||
if *fOutputFiltersLegacy != "" {
|
||||
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
|
||||
outputFilters = strings.Split(":"+outputFilter+":", ":")
|
||||
}
|
||||
if *fOutputFilters != "" {
|
||||
outputFilter := strings.TrimSpace(*fOutputFilters)
|
||||
outputFilters = strings.Split(":"+outputFilter+":", ":")
|
||||
}
|
||||
|
||||
// Print version
|
||||
if *fVersion {
|
||||
v := fmt.Sprintf("Telegraf - Version %s", Version)
|
||||
fmt.Println(v)
|
||||
return
|
||||
}
|
||||
|
||||
// Print sample config
|
||||
if *fSampleConfig {
|
||||
config.PrintSampleConfig(inputFilters, outputFilters)
|
||||
return
|
||||
}
|
||||
|
||||
// Print usage
|
||||
if *fUsage != "" {
|
||||
if err := config.PrintInputConfig(*fUsage); err != nil {
|
||||
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
|
||||
log.Fatalf("%s and %s", err, err2)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for <-reload {
|
||||
// Reset signal handler vars
|
||||
shutdown = make(chan struct{})
|
||||
reload <- false
|
||||
|
||||
// Prepare config
|
||||
var (
|
||||
c *config.Config
|
||||
err error
|
||||
)
|
||||
|
||||
if *fConfig != "" {
|
||||
if *fEtcd != "" {
|
||||
c = config.NewConfig()
|
||||
c.OutputFilters = outputFilters
|
||||
c.InputFilters = inputFilters
|
||||
if *fEtcdSendConfigDir != "" {
|
||||
// TODO check config format before write it
|
||||
// Write config dir to etcd
|
||||
err = c.LoadDirectory(*fEtcdSendConfigDir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = e.WriteConfigDir(*fEtcdSendConfigDir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return
|
||||
} else if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" {
|
||||
// TODO check config format before write it
|
||||
// Write config to etcd
|
||||
err = c.LoadConfig(*fEtcdSendConfig)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = e.WriteLabelConfig(*fEtcdSendLabel, *fEtcdSendConfig)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return
|
||||
} else {
|
||||
// Read config to etcd
|
||||
log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels)
|
||||
c, err = e.ReadConfig(c, *fEtcdReadLabels)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
} else if *fConfig != "" {
|
||||
// Read config from file
|
||||
c = config.NewConfig()
|
||||
c.OutputFilters = outputFilters
|
||||
c.InputFilters = inputFilters
|
||||
|
@ -152,6 +237,7 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Read config dir
|
||||
if *fConfigDirectoryLegacy != "" {
|
||||
err = c.LoadDirectory(*fConfigDirectoryLegacy)
|
||||
if err != nil {
|
||||
|
@ -159,12 +245,14 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
// Read config dir
|
||||
if *fConfigDirectory != "" {
|
||||
err = c.LoadDirectory(*fConfigDirectory)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
// check config
|
||||
if len(c.Outputs) == 0 {
|
||||
log.Fatalf("Error: no outputs found, did you provide a valid config file?")
|
||||
}
|
||||
|
@ -198,22 +286,6 @@ func main() {
|
|||
log.Fatal(err)
|
||||
}
|
||||
|
||||
shutdown := make(chan struct{})
|
||||
signals := make(chan os.Signal)
|
||||
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
|
||||
go func() {
|
||||
sig := <-signals
|
||||
if sig == os.Interrupt {
|
||||
close(shutdown)
|
||||
}
|
||||
if sig == syscall.SIGHUP {
|
||||
log.Printf("Reloading Telegraf config\n")
|
||||
<-reload
|
||||
reload <- true
|
||||
close(shutdown)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf("Starting Telegraf (version %s)\n", Version)
|
||||
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
|
||||
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
|
||||
"github.com/influxdata/config"
|
||||
"github.com/naoina/toml"
|
||||
"github.com/naoina/toml/ast"
|
||||
)
|
||||
|
||||
|
@ -39,6 +40,7 @@ func NewConfig() *Config {
|
|||
Agent: &AgentConfig{
|
||||
Interval: internal.Duration{Duration: 10 * time.Second},
|
||||
RoundInterval: true,
|
||||
Labels: make([]string, 0),
|
||||
FlushInterval: internal.Duration{Duration: 10 * time.Second},
|
||||
FlushJitter: internal.Duration{Duration: 5 * time.Second},
|
||||
},
|
||||
|
@ -92,6 +94,9 @@ type AgentConfig struct {
|
|||
// Quiet is the option for running in quiet mode
|
||||
Quiet bool
|
||||
Hostname string
|
||||
|
||||
// Etcd labels
|
||||
Labels []string
|
||||
}
|
||||
|
||||
// Inputs returns a list of strings of the configured inputs.
|
||||
|
@ -316,7 +321,16 @@ func (c *Config) LoadDirectory(path string) error {
|
|||
|
||||
// LoadConfig loads the given config file and applies it to c
|
||||
func (c *Config) LoadConfig(path string) error {
|
||||
tbl, err := config.ParseFile(path)
|
||||
data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.LoadConfigFromText(data)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Config) LoadConfigFromText(data []byte) error {
|
||||
tbl, err := toml.Parse(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,268 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"golang.org/x/net/context"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/coreos/etcd/client"
|
||||
|
||||
"github.com/influxdata/telegraf/internal/config"
|
||||
)
|
||||
|
||||
type EtcdClient struct {
|
||||
Kapi client.KeysAPI
|
||||
Folder string
|
||||
}
|
||||
|
||||
func (e *EtcdClient) LaunchWatcher(shutdown chan struct{}, signals chan os.Signal) {
|
||||
// TODO: All telegraf client will reload for each changes...
|
||||
// Maybe we want to reload on those we need to ???
|
||||
// So we need to create a watcher by labels ??
|
||||
|
||||
for {
|
||||
watcherOpts := client.WatcherOptions{AfterIndex: 0, Recursive: true}
|
||||
w := e.Kapi.Watcher(e.Folder, &watcherOpts)
|
||||
r, err := w.Next(context.Background())
|
||||
if err != nil {
|
||||
// TODO What we have to do here ????
|
||||
log.Fatal("Error occurred", err)
|
||||
}
|
||||
if r.Action == "set" || r.Action == "update" {
|
||||
// do something with Response r here
|
||||
log.Printf("Changes detected in etcd (%s action detected)\n", r.Action)
|
||||
log.Print("Reloading Telegraf config\n")
|
||||
signals <- syscall.SIGHUP
|
||||
time.Sleep(time.Duration(1) * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewEtcdClient(urls string, folder string) *EtcdClient {
|
||||
// Create a new etcd client
|
||||
cfg := client.Config{
|
||||
Endpoints: []string{"http://127.0.0.1:2379"},
|
||||
Transport: client.DefaultTransport,
|
||||
}
|
||||
|
||||
e := &EtcdClient{}
|
||||
c, err := client.New(cfg)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
kapi := client.NewKeysAPI(c)
|
||||
|
||||
e.Kapi = kapi
|
||||
e.Folder = folder
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *EtcdClient) Connect() error {
|
||||
//c, err := eclient.New(cfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EtcdClient) WriteConfigDir(configdir string) error {
|
||||
directoryEntries, err := ioutil.ReadDir(configdir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, entry := range directoryEntries {
|
||||
name := entry.Name()
|
||||
if entry.IsDir() {
|
||||
if name == "labels" {
|
||||
// Handle labels
|
||||
directoryEntries, err := ioutil.ReadDir(path.Join(configdir, name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, entry := range directoryEntries {
|
||||
filename := entry.Name()
|
||||
if len(filename) < 6 || filename[len(filename)-5:] != ".conf" {
|
||||
continue
|
||||
}
|
||||
label := filename[:len(filename)-5]
|
||||
err = e.WriteLabelConfig(label, path.Join(configdir, name, filename))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else if name == "hosts" {
|
||||
// Handle hosts specific config
|
||||
directoryEntries, err := ioutil.ReadDir(path.Join(configdir, name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, entry := range directoryEntries {
|
||||
filename := entry.Name()
|
||||
if len(filename) < 6 || filename[len(filename)-5:] != ".conf" {
|
||||
continue
|
||||
}
|
||||
hostname := filename[:len(filename)-5]
|
||||
err = e.WriteHostConfig(hostname, path.Join(configdir, name, filename))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
if name == "main.conf" {
|
||||
// Handle main config
|
||||
err := e.WriteMainConfig(path.Join(configdir, name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EtcdClient) WriteMainConfig(path string) error {
|
||||
// Write main config file in etcd
|
||||
key := "main"
|
||||
err := e.WriteConfig(key, path)
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *EtcdClient) WriteLabelConfig(label string, path string) error {
|
||||
// Write label config file in etcd
|
||||
key := "labels/" + label
|
||||
err := e.WriteConfig(key, path)
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *EtcdClient) WriteHostConfig(host string, path string) error {
|
||||
// Write host config file in etcd
|
||||
key := "hosts/" + host
|
||||
err := e.WriteConfig(key, path)
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *EtcdClient) WriteConfig(relative_key string, path string) error {
|
||||
// Read config file, get conf in tomlformat, convert to json
|
||||
// Then write it to etcd
|
||||
// TODO: Maybe we just want to store toml in etcd ? Is json really needed ????
|
||||
// Read file
|
||||
raw_data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Get toml
|
||||
var data interface{}
|
||||
_, err = toml.Decode(string(raw_data), &data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Get json
|
||||
json_data, _ := json.Marshal(&data)
|
||||
// Write it
|
||||
key := e.Folder + "/" + relative_key
|
||||
resp, _ := e.Kapi.Get(context.Background(), key, nil)
|
||||
if resp == nil {
|
||||
_, err = e.Kapi.Set(context.Background(), key, string(json_data), nil)
|
||||
} else {
|
||||
_, err = e.Kapi.Update(context.Background(), key, string(json_data))
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
} else {
|
||||
log.Printf("Config written with key %s\n", key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//func (e *EtcdClient) ReadConfig(labels []string) (*config.Config, error) {
|
||||
func (e *EtcdClient) ReadConfig(c *config.Config, labels string) (*config.Config, error) {
|
||||
// Get default config in etcd
|
||||
// key = /telegraf/default
|
||||
key := e.Folder + "/main"
|
||||
resp, err := e.Kapi.Get(context.Background(), key, nil)
|
||||
if err != nil {
|
||||
log.Printf("WARNING: [etcd] %s", err)
|
||||
} else {
|
||||
// Put it in toml
|
||||
data, err := json2toml(resp)
|
||||
if err != nil {
|
||||
log.Printf("WARNING: [etcd] %s", err)
|
||||
}
|
||||
c.LoadConfigFromText(data)
|
||||
}
|
||||
|
||||
// Get specific host config
|
||||
// key = /telegraf/hosts/HOSTNAME
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
log.Printf("WARNING: [etcd] %s", err)
|
||||
} else if hostname != "" {
|
||||
key = e.Folder + "/hosts/" + hostname
|
||||
resp, err := e.Kapi.Get(context.Background(), key, nil)
|
||||
if err != nil {
|
||||
log.Printf("WARNING: [etcd] %s", err)
|
||||
} else {
|
||||
// Put it in toml
|
||||
data, err := json2toml(resp)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
c.LoadConfigFromText(data)
|
||||
}
|
||||
}
|
||||
|
||||
// Concat labels from etcd and labels from command line
|
||||
labels_list := c.Agent.Labels
|
||||
if labels != "" {
|
||||
labels_list = append(labels_list, strings.Split(labels, ",")...)
|
||||
}
|
||||
|
||||
// Iterate on all labels
|
||||
// TODO check label order of importance ?
|
||||
for _, label := range labels_list {
|
||||
// Read from etcd
|
||||
// key = /telegraf/labels/LABEL
|
||||
key := e.Folder + "/labels/" + label
|
||||
resp, err := e.Kapi.Get(context.Background(), key, nil)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
continue
|
||||
}
|
||||
// Put it in toml
|
||||
data, err := json2toml(resp)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
continue
|
||||
}
|
||||
// Load config
|
||||
err = c.LoadConfigFromText(data)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func json2toml(resp *client.Response) ([]byte, error) {
|
||||
// Convert json to toml
|
||||
var json_data interface{}
|
||||
var data []byte
|
||||
json.Unmarshal([]byte(resp.Node.Value), &json_data)
|
||||
buf := new(bytes.Buffer)
|
||||
err := toml.NewEncoder(buf).Encode(json_data)
|
||||
data = buf.Bytes()
|
||||
return data, err
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"os"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf/internal/config"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/inputs/system"
|
||||
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
|
||||
)
|
||||
|
||||
func TestWrite(t *testing.T) {
|
||||
e := NewEtcdClient("http://localhost:2379", "/telegraf")
|
||||
|
||||
// Test write dir
|
||||
err := e.WriteConfigDir("./testdata/")
|
||||
require.NoError(t, err)
|
||||
resp, _ := e.Kapi.Get(context.Background(), "/telegraf/main", nil)
|
||||
assert.Equal(t,
|
||||
`{"agent":{"debug":false,"flush_interval":"10s","flush_jitter":"0s","hostname":"","interval":"2s","round_interval":true},"tags":{"dc":"us-east-1"}}`,
|
||||
resp.Node.Value)
|
||||
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/hosts/localhost", nil)
|
||||
assert.Equal(t,
|
||||
`{"agent":{"interval":"2s","labels":["influx"]},"inputs":{"cpu":[{"drop":["cpu_time*"],"percpu":true,"totalcpu":true}]}}`,
|
||||
resp.Node.Value)
|
||||
|
||||
// Test write conf
|
||||
err = e.WriteLabelConfig("mylabel", "./testdata/labels/network.conf")
|
||||
require.NoError(t, err)
|
||||
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil)
|
||||
assert.Equal(t, `{"inputs":{"net":[{}]}}`, resp.Node.Value)
|
||||
|
||||
// Test read
|
||||
c := config.NewConfig()
|
||||
var inputFilters []string
|
||||
var outputFilters []string
|
||||
c.OutputFilters = outputFilters
|
||||
c.InputFilters = inputFilters
|
||||
|
||||
net := inputs.Inputs["net"]().(*system.NetIOStats)
|
||||
influx := outputs.Outputs["influxdb"]().(*influxdb.InfluxDB)
|
||||
influx.URLs = []string{"http://localhost:8086"}
|
||||
influx.Database = "telegraf"
|
||||
influx.Precision = "s"
|
||||
|
||||
c, err = e.ReadConfig(c, "mylabel,influx")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, net, c.Inputs[0].Input,
|
||||
"Testdata did not produce a correct net struct.")
|
||||
assert.Equal(t, influx, c.Outputs[0].Output,
|
||||
"Testdata did not produce a correct influxdb struct.")
|
||||
|
||||
// Test reload
|
||||
shutdown := make(chan struct{})
|
||||
signals := make(chan os.Signal)
|
||||
go e.LaunchWatcher(shutdown, signals)
|
||||
// Test write conf
|
||||
err = e.WriteLabelConfig("mylabel", "./testdata/labels/network2.conf")
|
||||
require.NoError(t, err)
|
||||
resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil)
|
||||
assert.Equal(t, `{"inputs":{"net":[{"interfaces":["eth0"]}]}}`, resp.Node.Value)
|
||||
// TODO found a way to test reload ....
|
||||
sig := <-signals
|
||||
assert.Equal(t, syscall.SIGHUP, sig)
|
||||
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
|
||||
[agent]
|
||||
interval = "2s"
|
||||
labels = ["influx"]
|
||||
|
||||
[[inputs.cpu]]
|
||||
percpu = true
|
||||
totalcpu = true
|
||||
drop = ["cpu_time*"]
|
|
@ -0,0 +1,4 @@
|
|||
[[outputs.influxdb]]
|
||||
urls = ["http://localhost:8086"]
|
||||
database = "telegraf"
|
||||
precision = "s"
|
|
@ -0,0 +1,2 @@
|
|||
[[inputs.net]]
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
[[inputs.net]]
|
||||
|
||||
interfaces = ["eth0"]
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
[tags]
|
||||
dc = "us-east-1"
|
||||
|
||||
[agent]
|
||||
interval = "2s"
|
||||
round_interval = true
|
||||
flush_interval = "10s"
|
||||
flush_jitter = "0s"
|
||||
debug = false
|
||||
hostname = ""
|
|
@ -83,6 +83,7 @@ func TestGraphiteOK(t *testing.T) {
|
|||
wg.Add(1)
|
||||
// Waiting TCPserver
|
||||
wg.Wait()
|
||||
require.NoError(t, err2)
|
||||
g.Close()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue