From c4d5fda18895de82ab7d43e621bed8e6afdadc1a Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Fri, 5 Feb 2016 01:03:05 -0500 Subject: [PATCH] [WIP] Etcd integration for configuration --- Godeps | 1 + Makefile | 6 +- cmd/telegraf/telegraf.go | 274 ++++++++++++------ docs/CONFIGURATION.md | 191 ++++++++++++ internal/config/config.go | 35 ++- internal/etcd/etcd.go | 262 +++++++++++++++++ internal/etcd/etcd_test.go | 195 +++++++++++++ .../etcd/testdata/test1/hosts/localhost.conf | 9 + .../etcd/testdata/test1/labels/influx.conf | 4 + .../etcd/testdata/test1/labels/network.conf | 2 + .../etcd/testdata/test1/labels/network2.conf | 4 + internal/etcd/testdata/test1/main.conf | 10 + internal/etcd/testdata/test2/main.conf | 12 + 13 files changed, 901 insertions(+), 104 deletions(-) create mode 100644 internal/etcd/etcd.go create mode 100644 internal/etcd/etcd_test.go create mode 100644 internal/etcd/testdata/test1/hosts/localhost.conf create mode 100644 internal/etcd/testdata/test1/labels/influx.conf create mode 100644 internal/etcd/testdata/test1/labels/network.conf create mode 100644 internal/etcd/testdata/test1/labels/network2.conf create mode 100644 internal/etcd/testdata/test1/main.conf create mode 100644 internal/etcd/testdata/test2/main.conf diff --git a/Godeps b/Godeps index 2fc53d8c5..2de0c41c2 100644 --- a/Godeps +++ b/Godeps @@ -8,6 +8,7 @@ github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1 github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2 github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6 github.com/dancannon/gorethink e7cac92ea2bc52638791a021f212145acfedb1fc +github.com/coreos/etcd bdee27b19e8601ffd7bd4f0481abe9bbae04bd09 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 diff --git a/Makefile b/Makefile index c87f78b55..ccad3a67c 100644 --- a/Makefile +++ b/Makefile @@ -71,6 +71,7 @@ endif docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim + docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379 # Run docker containers necessary for CircleCI unit tests docker-run-circle: @@ -85,11 +86,12 @@ docker-run-circle: docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim + docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379 # Kill all docker containers, ignore errors docker-kill: - -docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp - -docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp + -docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd + -docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd # Run full unit tests using docker containers (includes setup and teardown) test: vet docker-kill docker-run diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index be591829b..edeec6d15 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/internal/etcd" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" @@ -22,6 +23,13 @@ var fDebug = flag.Bool("debug", false, var fQuiet = flag.Bool("quiet", false, "run in quiet mode") var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit") +var fEtcd = flag.String("etcd", "", "etcd urls where configuration is stored (comma separated)") +var fEtcdFolder = flag.String("etcdfolder", "/telegraf", "etcd root folder where configuration is stored") +var fEtcdSendConfigDir = flag.String("etcdwriteconfigdir", "", "store the following config dir to etcd") +var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd") +var fEtcdEraseConfig = flag.Bool("etcderaseconfig", false, "erase all telegraf config in etcd") +var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label") +var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)") var fConfig = flag.String("config", "", "configuration file to load") var fConfigDirectory = flag.String("config-directory", "", "directory containing additional *.conf files") @@ -58,18 +66,25 @@ Usage: The flags are: - -config configuration file to load - -test gather metrics once, print them to stdout, and exit - -sample-config print out full sample configuration to stdout - -config-directory directory containing additional *.conf files - -input-filter filter the input plugins to enable, separator is : - -input-list print all the plugins inputs - -output-filter filter the output plugins to enable, separator is : - -output-list print all the available outputs - -usage print usage for a plugin, ie, 'telegraf -usage mysql' - -debug print metrics as they're generated to stdout - -quiet run in quiet mode - -version print the version to stdout + -config configuration file to load + -test gather metrics once, print them to stdout, and exit + -sample-config print out full sample configuration to stdout + -config-directory directory containing additional *.conf files + -etcd etcd urls where configuration is stored (comma separated) + -etcdfolder etcd folder where configuration is stored and read + -etcdwriteconfigdir store the following config dir to etcd + -etcdwriteconfig store the following config file to etcd + -etcdwritelabel store config file to etcd with this label + -etcdreadlabels read config from etcd using labels (comma-separated) + -etcderaseconfig erase all telegraf config in etcd + -input-filter filter the input plugins to enable, separator is : + -input-list print all the plugins inputs + -output-filter filter the output plugins to enable, separator is : + -output-list print all the available outputs + -usage print usage for a plugin, ie, 'telegraf -usage mysql' + -debug print metrics as they're generated to stdout + -quiet run in quiet mode + -version print the version to stdout Examples: @@ -90,92 +105,184 @@ Examples: ` func main() { + // Read flags + flag.Usage = func() { usageExit(0) } + flag.Parse() + args := flag.Args() + if flag.NFlag() == 0 && len(args) == 0 { + usageExit(0) + } + + // Prepare signals handling reload := make(chan bool, 1) reload <- true - for <-reload { - reload <- false - flag.Usage = func() { usageExit(0) } - flag.Parse() - args := flag.Args() + shutdown := make(chan struct{}) + signals := make(chan os.Signal) + signal.Notify(signals, os.Interrupt, syscall.SIGHUP) - if flag.NFlag() == 0 && len(args) == 0 { - usageExit(0) + // Prepare etcd if needed + var e *etcd.EtcdClient + if *fEtcd != "" { + e = etcd.NewEtcdClient(*fEtcd, *fEtcdFolder) + if *fEtcdSendConfig == "" && *fEtcdSendLabel == "" && *fEtcdSendConfigDir == "" { + go e.LaunchWatcher(shutdown, signals) } + } - var inputFilters []string - if *fInputFiltersLegacy != "" { - inputFilter := strings.TrimSpace(*fInputFiltersLegacy) - inputFilters = strings.Split(":"+inputFilter+":", ":") - } - if *fInputFilters != "" { - inputFilter := strings.TrimSpace(*fInputFilters) - inputFilters = strings.Split(":"+inputFilter+":", ":") - } - - var outputFilters []string - if *fOutputFiltersLegacy != "" { - outputFilter := strings.TrimSpace(*fOutputFiltersLegacy) - outputFilters = strings.Split(":"+outputFilter+":", ":") - } - if *fOutputFilters != "" { - outputFilter := strings.TrimSpace(*fOutputFilters) - outputFilters = strings.Split(":"+outputFilter+":", ":") - } - - if len(args) > 0 { - switch args[0] { - case "version": - v := fmt.Sprintf("Telegraf - Version %s", Version) - fmt.Println(v) - return - case "config": - config.PrintSampleConfig(inputFilters, outputFilters) - return + // Handle signals + go func() { + for { + sig := <-signals + if sig == os.Interrupt { + close(shutdown) + } else if sig == syscall.SIGHUP { + log.Print("Reloading Telegraf config\n") + <-reload + reload <- true + close(shutdown) } } + }() - if *fOutputList { - fmt.Println("Available Output Plugins:") - for k, _ := range outputs.Outputs { - fmt.Printf(" %s\n", k) - } - return - } + // Prepare inputs + var inputFilters []string + if *fInputFiltersLegacy != "" { + inputFilter := strings.TrimSpace(*fInputFiltersLegacy) + inputFilters = strings.Split(":"+inputFilter+":", ":") + } + if *fInputFilters != "" { + inputFilter := strings.TrimSpace(*fInputFilters) + inputFilters = strings.Split(":"+inputFilter+":", ":") + } - if *fInputList { - fmt.Println("Available Input Plugins:") - for k, _ := range inputs.Inputs { - fmt.Printf(" %s\n", k) - } - return - } + // Prepare outputs + var outputFilters []string + if *fOutputFiltersLegacy != "" { + outputFilter := strings.TrimSpace(*fOutputFiltersLegacy) + outputFilters = strings.Split(":"+outputFilter+":", ":") + } + if *fOutputFilters != "" { + outputFilter := strings.TrimSpace(*fOutputFilters) + outputFilters = strings.Split(":"+outputFilter+":", ":") + } - if *fVersion { + if len(args) > 0 { + switch args[0] { + case "version": v := fmt.Sprintf("Telegraf - Version %s", Version) fmt.Println(v) return - } - - if *fSampleConfig { + case "config": config.PrintSampleConfig(inputFilters, outputFilters) return } + } - if *fUsage != "" { - if err := config.PrintInputConfig(*fUsage); err != nil { - if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { - log.Fatalf("%s and %s", err, err2) - } - } - return + if *fOutputList { + fmt.Println("Available Output Plugins:") + for k, _ := range outputs.Outputs { + fmt.Printf(" %s\n", k) } + return + } + if *fInputList { + fmt.Println("Available Input Plugins:") + for k, _ := range inputs.Inputs { + fmt.Printf(" %s\n", k) + } + return + } + + // Print version + if *fVersion { + v := fmt.Sprintf("Telegraf - Version %s", Version) + fmt.Println(v) + return + } + + // Print sample config + if *fSampleConfig { + config.PrintSampleConfig(inputFilters, outputFilters) + return + } + + // Print usage + if *fUsage != "" { + if err := config.PrintInputConfig(*fUsage); err != nil { + if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { + log.Fatalf("%s and %s", err, err2) + } + } + return + } + + for <-reload { + // Reset signal handler vars + shutdown = make(chan struct{}) + reload <- false + + // Prepare config var ( c *config.Config err error ) - if *fConfig != "" { + if *fEtcd != "" { + c = config.NewConfig() + c.OutputFilters = outputFilters + c.InputFilters = inputFilters + + if *fEtcdSendConfigDir != "" { + // TODO check config format before write it + // Erase config in etcd + if *fEtcdEraseConfig { + err = e.DeleteConfig("") + if err != nil { + err = fmt.Errorf("Error erasing Telegraf Etcd Config: %s", err) + log.Fatal(err) + } + } + // Write config dir to etcd + err = c.LoadDirectory(*fEtcdSendConfigDir) + if err != nil { + log.Fatal(err) + } + err = e.WriteConfigDir(*fEtcdSendConfigDir) + if err != nil { + log.Fatal(err) + } + return + } else if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" { + // TODO check config format before write it + // Write config to etcd + err = c.LoadConfig(*fEtcdSendConfig) + if err != nil { + log.Fatal(err) + } + err = e.WriteLabelConfig(*fEtcdSendLabel, *fEtcdSendConfig) + if err != nil { + log.Fatal(err) + } + return + } else if *fEtcdEraseConfig { + // Erase config in etcd + err = e.DeleteConfig("") + if err != nil { + err = fmt.Errorf("Error erasing Telegraf Etcd Config: %s", err) + log.Fatal(err) + } + return + } else { + // Read config to etcd + log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels) + c, err = e.ReadConfig(c, *fEtcdReadLabels) + if err != nil { + log.Fatal(err) + } + } + } else if *fConfig != "" { + // Read config from file c = config.NewConfig() c.OutputFilters = outputFilters c.InputFilters = inputFilters @@ -188,6 +295,7 @@ func main() { os.Exit(1) } + // Read config dir if *fConfigDirectoryLegacy != "" { err = c.LoadDirectory(*fConfigDirectoryLegacy) if err != nil { @@ -195,12 +303,14 @@ func main() { } } + // Read config dir if *fConfigDirectory != "" { err = c.LoadDirectory(*fConfigDirectory) if err != nil { log.Fatal(err) } } + // check config if len(c.Outputs) == 0 { log.Fatalf("Error: no outputs found, did you provide a valid config file?") } @@ -234,22 +344,6 @@ func main() { log.Fatal(err) } - shutdown := make(chan struct{}) - signals := make(chan os.Signal) - signal.Notify(signals, os.Interrupt, syscall.SIGHUP) - go func() { - sig := <-signals - if sig == os.Interrupt { - close(shutdown) - } - if sig == syscall.SIGHUP { - log.Printf("Reloading Telegraf config\n") - <-reload - reload <- true - close(shutdown) - } - }() - log.Printf("Starting Telegraf (version %s)\n", Version) log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " ")) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 0afaa120f..0ab65008a 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -240,3 +240,194 @@ Outputs also support the same configurable options as inputs [outputs.influxdb.tagpass] cpu = ["cpu0"] ``` + +## Etcd Configuration + +### Command line parameters + +Telegraf option related to Etcd: +``` + -etcd etcd urls where configuration is stored (comma separated) + -etcdfolder etcd folder where configuration is stored and read + -etcdwriteconfigdir store the following config dir to etcd + -etcdwriteconfig store the following config file to etcd + -etcderaseconfig erase all telegraf config in etcd + -etcdwritelabel store config file to etcd with this label + -etcdreadlabels read config from etcd using labels (comma-separated) +``` + +### Features + +* Main config file can be loaded in Etcd +* Each agent try automatically to find its own key in Etcd (/telegraf/hosts/HOSTNAME) +* Labels can be configured in config files, so labels could be from Etcd +* Etcd config watcher: that reload Telegraf when a change is detected in Etcd. +* You can write all configuration of ALL your Telegraf agent in a folder then send it to Etcd. + +### Put simple configuration in Etcd + + +#### Create main configuration file + +Create a `myconf.conf` file, which will be stored in Etcd, with the following content: + +``` +[tags] + dc = "us-east-1" + +[agent] + interval = "10s" + round_interval = true + flush_interval = "10s" + flush_jitter = "0s" + debug = false + hostname = "" + + +[[outputs.influxdb]] + urls = ["http://localhost:8086"] + database = "telegraf" + precision = "s" + + +[[inputs.cpu]] + percpu = true + totalcpu = true + drop = ["cpu_time*"] +``` + +#### Send the configuration to Etcd + +Send this file to Etcd using the label **mylabel** + +``` +$ telegraf -etcd http://127.0.0.1:2379 -etcdwritelabel mylabel -etcdwriteconfig myconf.conf +2016/03/08 19:50:27 Config written with key /telegraf/labels/mylabel +``` + +NOTE: By default, configuration is stored in the folder `/telegraf` in Etcd. + You can change it using `-etcdfolder` flag + + +#### Check if your configuration in Etcd + +You can check if data is really written in Etcd with + +``` +$ etcdctl get /telegraf/labels/mylabel +[tags] + dc = "us-east-1" + +[agent] + interval = "10s" + round_interval = true + flush_interval = "10s" + flush_jitter = "0s" + debug = false + hostname = "" + + +[[outputs.influxdb]] + urls = ["http://localhost:8086"] + database = "telegraf" + precision = "s" + + +[[inputs.cpu]] + percpu = true + totalcpu = true + drop = ["cpu_time*"] +``` + +#### + +Now any telegraf agent can load this config use the label **mylabel** + +``` +$ telegraf -etcd http://127.0.0.1:2379 -etcdreadlabels mylabel +Config read with label mylabel +2016/03/08 19:54:52 WARNING: [etcd] 100: Key not found (/telegraf/main) [245] +2016/03/08 19:54:52 WARNING: [etcd] 100: Key not found (/telegraf/hosts) [245] +2016/03/08 19:54:52 Database creation failed: Get http://localhost:8086/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+telegraf: dial tcp [::1]:8086: getsockopt: connection refused +2016/03/08 19:54:52 Starting Telegraf (version 0.10.4.1-49-g07a3a07) +2016/03/08 19:54:52 Loaded outputs: influxdb +2016/03/08 19:54:52 Loaded inputs: cpu +2016/03/08 19:54:52 Tags enabled: dc=us-east-1 host=hostname +2016/03/08 19:54:52 Agent Config: Interval:10s, Debug:false, Quiet:false, Hostname:"hostname", Flush Interval:10s +``` + +NOTE: By default, configuration is read in the folder `/telegraf` in Etcd. + You can change it using `-etcdfolder` flag + + +#### Read configuration from Etcd + +1. `/telegraf/main` key +2. `/telegraf/hosts/HOSTNAME` key +3. `/telegraf/labels/LABEL1` key +4. `/telegraf/labels/LABEL2` key + +(First means less importance) + +### Put configuration folder in Etcd + +#### Create configuration folder structure + +You can set your configuration in a folder like this: + +``` +testdata/test1 +├── hosts +│ ├── myserver1.conf +│ └── myserver2.conf +├── labels +│ ├── influx.conf +│ ├── network2.conf +│ └── network.conf +└── main.conf +``` + +The following table how files and folder are stored in Etcd: + +| Files | Etcd | Details +|-------------------------|---------------------|-----------------------------------------------------------------------| +| | `/telegraf ` | | +| `/main.conf` | `├── main ` | Equivalent to main config file load with `-config` option | +| | `├── hosts/ ` | | +| `/host/myserver1.conf` | `│ ├── myserver1` | Configuration specific for Telegraf agent with hostname **myserver1** | +| `/host/myserver2.conf` | `│ └── myserver2` | Configuration specific for Telegraf agent with hostname **myserver2** | +| | `└── labels/ ` | | +| `/labels/influx.conf` | `    ├── influx ` | Configuration loaded by hosts with etcd label **influx** | +| `/labels/network2.conf` | `    ├── network2 ` | Configuration loaded by hosts with etcd label **network2** | +| `/labels/network.conf` | `    └── network ` | Configuration loaded by hosts with etcd label **network** | + + +(An example is available [here](../internal/etcd/testdata/test1)) + +#### Send configuration to Etcd + +Then you can send your configuration folder to Etcd: + +``` +$ telegraf -etcd http://127.0.0.1:2379 -etcdwriteconfigdir internal/etcd/testdata/test1/ +2016/03/08 20:14:28 Config written with key /telegraf/hosts/myserver1 +2016/03/08 20:14:28 Config written with key /telegraf/labels/influx +2016/03/08 20:14:28 Config written with key /telegraf/labels/network +2016/03/08 20:14:28 Config written with key /telegraf/labels/network2 +2016/03/08 20:14:28 Config written with key /telegraf/main +``` +#### Read configuration from Etcd + +Then you can start all your telegraf agents like this + +``` +$ telegraf -etcd http://127.0.0.1:2379 -etcdreadlabels=influx,network +2016/03/08 20:17:25 Config read from etcd with labels influx,network +2016/03/08 20:17:25 Database creation failed: Get http://localhost:8086/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+telegraf: dial tcp [::1]:8086: getsockopt: connection refused +2016/03/08 20:17:25 Starting Telegraf (version 0.10.4.1-49-g07a3a07) +2016/03/08 20:17:25 Loaded outputs: influxdb +2016/03/08 20:17:25 Loaded inputs: net +2016/03/08 20:17:25 Tags enabled: dc=us-east-1 host=myserver1 +2016/03/08 20:17:25 Agent Config: Interval:2s, Debug:false, Quiet:false, Hostname:"myserver1", Flush Interval:10s +2016/03/08 20:17:26 Gathered metrics, (2s interval), from 1 inputs in 1.382394ms +``` diff --git a/internal/config/config.go b/internal/config/config.go index 1e07234e8..5e9a2dbaf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -57,6 +57,7 @@ func NewConfig() *Config { Agent: &AgentConfig{ Interval: internal.Duration{Duration: 10 * time.Second}, RoundInterval: true, + Labels: make([]string, 0), FlushInterval: internal.Duration{Duration: 10 * time.Second}, FlushJitter: internal.Duration{Duration: 5 * time.Second}, }, @@ -116,6 +117,8 @@ type AgentConfig struct { Quiet bool Hostname string OmitHostname bool + // Etcd labels + Labels []string } // Inputs returns a list of strings of the configured inputs. @@ -410,40 +413,48 @@ func (c *Config) LoadConfig(path string) error { if err != nil { return fmt.Errorf("Error parsing %s, %s", path, err) } + err = c.LoadConfigFromTable(tbl) + if err != nil { + return fmt.Errorf(err.Error(), path) + } + return nil +} +func (c *Config) LoadConfigFromTable(tbl *ast.Table) error { + var err error for name, val := range tbl.Fields { subTable, ok := val.(*ast.Table) if !ok { - return fmt.Errorf("%s: invalid configuration", path) + return fmt.Errorf("%%s: invalid configuration") } switch name { case "agent": if err = config.UnmarshalTable(subTable, c.Agent); err != nil { log.Printf("Could not parse [agent] config\n") - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %%s, %s", err) } case "global_tags", "tags": if err = config.UnmarshalTable(subTable, c.Tags); err != nil { log.Printf("Could not parse [global_tags] config\n") - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %%s, %s", err) } case "outputs": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { case *ast.Table: if err = c.addOutput(pluginName, pluginSubTable); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %%s, %s", err) } case []*ast.Table: for _, t := range pluginSubTable { if err = c.addOutput(pluginName, t); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %%s, %s", err) } } default: - return fmt.Errorf("Unsupported config format: %s, file %s", - pluginName, path) + return fmt.Errorf("Unsupported config format: %s, file %%s", + pluginName) } } case "inputs", "plugins": @@ -451,24 +462,24 @@ func (c *Config) LoadConfig(path string) error { switch pluginSubTable := pluginVal.(type) { case *ast.Table: if err = c.addInput(pluginName, pluginSubTable); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %%s, %s", err) } case []*ast.Table: for _, t := range pluginSubTable { if err = c.addInput(pluginName, t); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %%s, %s", err) } } default: - return fmt.Errorf("Unsupported config format: %s, file %s", - pluginName, path) + return fmt.Errorf("Unsupported config format: %s, file %%s", + pluginName) } } // Assume it's an input input for legacy config file support if no other // identifiers are present default: if err = c.addInput(name, subTable); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %%s, %s", err) } } } diff --git a/internal/etcd/etcd.go b/internal/etcd/etcd.go new file mode 100644 index 000000000..09234ec93 --- /dev/null +++ b/internal/etcd/etcd.go @@ -0,0 +1,262 @@ +package etcd + +import ( + "golang.org/x/net/context" + "io/ioutil" + "log" + "os" + "path" + "strings" + "syscall" + "time" + + "github.com/coreos/etcd/client" + influxconfig "github.com/influxdata/config" + "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" +) + +type EtcdClient struct { + Kapi client.KeysAPI + Folder string +} + +func (e *EtcdClient) LaunchWatcher(shutdown chan struct{}, signals chan os.Signal) { + // TODO: All telegraf client will reload for each changes... + // Maybe we want to reload on those we need to ??? + // So we need to create a watcher by labels ?? + for { + watcherOpts := client.WatcherOptions{AfterIndex: 0, Recursive: true} + w := e.Kapi.Watcher(e.Folder, &watcherOpts) + r, err := w.Next(context.Background()) + if err != nil { + // TODO What we have to do here ???? + log.Fatal("Error occurred", err) + } + if r.Action == "set" || r.Action == "update" { + // do something with Response r here + log.Printf("Changes detected in etcd (%s action detected)\n", r.Action) + log.Print("Reloading Telegraf config\n") + signals <- syscall.SIGHUP + time.Sleep(time.Duration(1) * time.Second) + } + } +} + +func NewEtcdClient(urls string, folder string) *EtcdClient { + splittedUrls := strings.Split(urls, ",") + // Create a new etcd client + cfg := client.Config{ + Endpoints: splittedUrls, + Transport: client.DefaultTransport, + } + + e := &EtcdClient{} + c, err := client.New(cfg) + if err != nil { + log.Fatal(err) + } + kapi := client.NewKeysAPI(c) + + e.Kapi = kapi + e.Folder = folder + + return e +} + +func (e *EtcdClient) WriteConfigDir(configdir string) error { + directoryEntries, err := ioutil.ReadDir(configdir) + if err != nil { + return err + } + for _, entry := range directoryEntries { + name := entry.Name() + if entry.IsDir() { + if name == "labels" { + // Handle labels + directoryEntries, err := ioutil.ReadDir(path.Join(configdir, name)) + if err != nil { + return err + } + for _, entry := range directoryEntries { + filename := entry.Name() + if len(filename) < 6 || filename[len(filename)-5:] != ".conf" { + continue + } + label := filename[:len(filename)-5] + err = e.WriteLabelConfig(label, path.Join(configdir, name, filename)) + if err != nil { + return err + } + } + } else if name == "hosts" { + // Handle hosts specific config + directoryEntries, err := ioutil.ReadDir(path.Join(configdir, name)) + if err != nil { + return err + } + + for _, entry := range directoryEntries { + filename := entry.Name() + if len(filename) < 6 || filename[len(filename)-5:] != ".conf" { + continue + } + hostname := filename[:len(filename)-5] + err = e.WriteHostConfig(hostname, path.Join(configdir, name, filename)) + if err != nil { + return err + } + } + } + continue + } + if name == "main.conf" { + // Handle main config + err := e.WriteMainConfig(path.Join(configdir, name)) + if err != nil { + return err + } + } else { + continue + } + } + + return nil +} + +func (e *EtcdClient) DeleteConfig(path string) error { + // removeWrite main config file in etcd + key := e.Folder + "/" + path + _, err := e.Kapi.Delete(context.Background(), key, &client.DeleteOptions{Recursive: true}) + return err +} + +func (e *EtcdClient) WriteMainConfig(path string) error { + // Write main config file in etcd + key := "main" + err := e.WriteConfig(key, path) + return err +} + +func (e *EtcdClient) WriteLabelConfig(label string, path string) error { + // Write label config file in etcd + key := "labels/" + label + err := e.WriteConfig(key, path) + return err +} + +func (e *EtcdClient) WriteHostConfig(host string, path string) error { + // Write host config file in etcd + key := "hosts/" + host + err := e.WriteConfig(key, path) + return err +} + +func (e *EtcdClient) WriteConfig(relative_key string, path string) error { + // Read config file, get conf in tomlformat, convert to json + // Then write it to etcd + // Read file + tbl, err := influxconfig.ParseFile(path) + if err != nil { + return err + } + // Get toml + toml_data := tbl.Source() + // Write it + key := e.Folder + "/" + relative_key + resp, _ := e.Kapi.Get(context.Background(), key, nil) + if resp == nil { + _, err = e.Kapi.Set(context.Background(), key, string(toml_data), nil) + } else { + _, err = e.Kapi.Update(context.Background(), key, string(toml_data)) + } + if err != nil { + log.Fatal(err) + return err + } else { + log.Printf("Config written with key %s\n", key) + } + return nil +} + +//func (e *EtcdClient) ReadConfig(labels []string) (*config.Config, error) { +func (e *EtcdClient) ReadConfig(c *config.Config, labels string) (*config.Config, error) { + // Get default config in etcd + // key = /telegraf/default + key := e.Folder + "/main" + resp, err := e.Kapi.Get(context.Background(), key, nil) + if err != nil { + log.Printf("WARNING: [etcd] %s", err) + } else { + // Put it in toml + tbl, err := toml2table(resp) + if err != nil { + log.Printf("WARNING: [etcd] %s", err) + } + c.LoadConfigFromTable(tbl) + if err != nil { + log.Print(err, "") + } + } + + // Get specific host config + // key = /telegraf/hosts/HOSTNAME + hostname, err := os.Hostname() + if err != nil { + log.Printf("WARNING: [etcd] %s", err) + } else if hostname != "" { + key = e.Folder + "/hosts/" + hostname + resp, err := e.Kapi.Get(context.Background(), key, nil) + if err != nil { + log.Printf("WARNING: [etcd] %s", err) + } else { + // Put it in toml + tbl, err := toml2table(resp) + if err != nil { + log.Print(err) + } + c.LoadConfigFromTable(tbl) + if err != nil { + log.Print(err, "") + } + } + } + + // Concat labels from etcd and labels from command line + labels_list := c.Agent.Labels + if labels != "" { + labels_list = append(labels_list, strings.Split(labels, ",")...) + } + + // Iterate on all labels + // TODO check label order of importance ? + for _, label := range labels_list { + // Read from etcd + // key = /telegraf/labels/LABEL + key := e.Folder + "/labels/" + label + resp, err := e.Kapi.Get(context.Background(), key, nil) + if err != nil { + log.Print(err) + continue + } + // Put it in toml + tbl, err := toml2table(resp) + if err != nil { + log.Print(err) + continue + } + // Load config + err = c.LoadConfigFromTable(tbl) + if err != nil { + log.Print(err, "") + } + } + + return c, nil +} + +func toml2table(resp *client.Response) (*ast.Table, error) { + // Convert json to toml + return toml.Parse([]byte(resp.Node.Value)) +} diff --git a/internal/etcd/etcd_test.go b/internal/etcd/etcd_test.go new file mode 100644 index 000000000..40824d597 --- /dev/null +++ b/internal/etcd/etcd_test.go @@ -0,0 +1,195 @@ +package etcd + +import ( + "golang.org/x/net/context" + "io" + "os" + "syscall" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/outputs" + + "github.com/influxdata/telegraf/plugins/inputs/system" + "github.com/influxdata/telegraf/plugins/outputs/influxdb" + + eclient "github.com/coreos/etcd/client" +) + +func Test1Write(t *testing.T) { + // Delete hostname conf file + hostname, _ := os.Hostname() + os.Remove("./testdata/test1/hosts/" + hostname + ".conf") + // Get etcd client + e := NewEtcdClient("http://localhost:2379", "/telegraf") + // Delete old conf from etcd + delOptions := &eclient.DeleteOptions{ + Recursive: true, + Dir: true, + } + e.Kapi.Delete(context.Background(), "/telegraf", delOptions) + + // Test write dir + err := e.WriteConfigDir("./testdata/test1") + require.NoError(t, err) + resp, _ := e.Kapi.Get(context.Background(), "/telegraf/main", nil) + assert.Equal(t, + "[tags]\n dc = \"us-east-1\"\n\n[agent]\n interval = \"2s\"\n round_interval = true\n flush_interval = \"10s\"\n flush_jitter = \"0s\"\n debug = false\n hostname = \"\"\n", + resp.Node.Value) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/hosts/localhost", nil) + assert.Equal(t, + "\n[agent]\n interval = \"2s\"\n labels = [\"influx\"]\n\n[[inputs.cpu]]\n percpu = true\n totalcpu = true\n drop = [\"cpu_time*\"]\n", + resp.Node.Value) + + // Test write conf + err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network.conf") + require.NoError(t, err) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil) + assert.Equal(t, + "[[inputs.net]]\n\n", + resp.Node.Value) + + // Test read + c := config.NewConfig() + var inputFilters []string + var outputFilters []string + c.OutputFilters = outputFilters + c.InputFilters = inputFilters + + net := inputs.Inputs["net"]().(*system.NetIOStats) + influx := outputs.Outputs["influxdb"]().(*influxdb.InfluxDB) + influx.URLs = []string{"http://localhost:8086"} + influx.Database = "telegraf" + influx.Precision = "s" + + c, err = e.ReadConfig(c, "mylabel,influx") + require.NoError(t, err) + assert.Equal(t, net, c.Inputs[0].Input, + "Testdata did not produce a correct net struct.") + assert.Equal(t, influx, c.Outputs[0].Output, + "Testdata did not produce a correct influxdb struct.") + + // Test reload + shutdown := make(chan struct{}) + signals := make(chan os.Signal) + go e.LaunchWatcher(shutdown, signals) + // Test write conf + err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network2.conf") + require.NoError(t, err) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil) + assert.Equal(t, + "[[inputs.net]]\n\n interfaces = [\"eth0\"]\n\n", + resp.Node.Value) + // TODO found a way to test reload .... + sig := <-signals + assert.Equal(t, syscall.SIGHUP, sig) + +} + +func Test2Error(t *testing.T) { + e := NewEtcdClient("http://localhost:2379", "/telegraf") + + // Test write dir + err := e.WriteConfigDir("./testdata/test2") + require.Error(t, err) +} + +func Test3Write(t *testing.T) { + // Delete old hostname conf file + hostname, _ := os.Hostname() + os.Remove("./testdata/test1/hosts/" + hostname + ".conf") + // Write host file + if hostname != "" { + r, err := os.Open("./testdata/test1/hosts/localhost.conf") + if err != nil { + panic(err) + } + defer r.Close() + + w, err := os.Create("./testdata/test1/hosts/" + hostname + ".conf") + if err != nil { + panic(err) + } + defer w.Close() + + // do the actual work + _, err = io.Copy(w, r) + if err != nil { + panic(err) + } + } + // Get tcd client + e := NewEtcdClient("http://localhost:2379", "/telegraf") + // Delete old conf from etcd + delOptions := &eclient.DeleteOptions{ + Recursive: true, + Dir: true, + } + e.Kapi.Delete(context.Background(), "/telegraf", delOptions) + + // Test write dir + err := e.WriteConfigDir("./testdata/test1") + require.NoError(t, err) + resp, _ := e.Kapi.Get(context.Background(), "/telegraf/main", nil) + assert.Equal(t, + "[tags]\n dc = \"us-east-1\"\n\n[agent]\n interval = \"2s\"\n round_interval = true\n flush_interval = \"10s\"\n flush_jitter = \"0s\"\n debug = false\n hostname = \"\"\n", + resp.Node.Value) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/hosts/localhost", nil) + assert.Equal(t, + "\n[agent]\n interval = \"2s\"\n labels = [\"influx\"]\n\n[[inputs.cpu]]\n percpu = true\n totalcpu = true\n drop = [\"cpu_time*\"]\n", + resp.Node.Value) + + // Test write conf + err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network.conf") + require.NoError(t, err) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil) + assert.Equal(t, + "[[inputs.net]]\n\n", + resp.Node.Value) + + // Test read + c := config.NewConfig() + var inputFilters []string + var outputFilters []string + c.OutputFilters = outputFilters + c.InputFilters = inputFilters + + cpu := inputs.Inputs["cpu"]().(*system.CPUStats) + cpu.PerCPU = true + cpu.TotalCPU = true + net := inputs.Inputs["net"]().(*system.NetIOStats) + influx := outputs.Outputs["influxdb"]().(*influxdb.InfluxDB) + influx.URLs = []string{"http://localhost:8086"} + influx.Database = "telegraf" + influx.Precision = "s" + + c, err = e.ReadConfig(c, "mylabel,influx") + require.NoError(t, err) + assert.Equal(t, cpu, c.Inputs[0].Input, + "Testdata did not produce a correct net struct.") + assert.Equal(t, net, c.Inputs[1].Input, + "Testdata did not produce a correct net struct.") + assert.Equal(t, influx, c.Outputs[0].Output, + "Testdata did not produce a correct influxdb struct.") + + // Test reload + shutdown := make(chan struct{}) + signals := make(chan os.Signal) + go e.LaunchWatcher(shutdown, signals) + // Test write conf + err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network2.conf") + require.NoError(t, err) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil) + assert.Equal(t, + "[[inputs.net]]\n\n interfaces = [\"eth0\"]\n\n", + resp.Node.Value) + // TODO found a way to test reload .... + sig := <-signals + assert.Equal(t, syscall.SIGHUP, sig) + // Delete hostname conf file + os.Remove("./testdata/test1/hosts/" + hostname + ".conf") +} diff --git a/internal/etcd/testdata/test1/hosts/localhost.conf b/internal/etcd/testdata/test1/hosts/localhost.conf new file mode 100644 index 000000000..d157dd2f5 --- /dev/null +++ b/internal/etcd/testdata/test1/hosts/localhost.conf @@ -0,0 +1,9 @@ + +[agent] + interval = "2s" + labels = ["influx"] + +[[inputs.cpu]] + percpu = true + totalcpu = true + drop = ["cpu_time*"] diff --git a/internal/etcd/testdata/test1/labels/influx.conf b/internal/etcd/testdata/test1/labels/influx.conf new file mode 100644 index 000000000..100ca0264 --- /dev/null +++ b/internal/etcd/testdata/test1/labels/influx.conf @@ -0,0 +1,4 @@ +[[outputs.influxdb]] + urls = ["http://localhost:8086"] + database = "telegraf" + precision = "s" diff --git a/internal/etcd/testdata/test1/labels/network.conf b/internal/etcd/testdata/test1/labels/network.conf new file mode 100644 index 000000000..848a9702f --- /dev/null +++ b/internal/etcd/testdata/test1/labels/network.conf @@ -0,0 +1,2 @@ +[[inputs.net]] + diff --git a/internal/etcd/testdata/test1/labels/network2.conf b/internal/etcd/testdata/test1/labels/network2.conf new file mode 100644 index 000000000..49ad3d94a --- /dev/null +++ b/internal/etcd/testdata/test1/labels/network2.conf @@ -0,0 +1,4 @@ +[[inputs.net]] + + interfaces = ["eth0"] + diff --git a/internal/etcd/testdata/test1/main.conf b/internal/etcd/testdata/test1/main.conf new file mode 100644 index 000000000..0adf151a8 --- /dev/null +++ b/internal/etcd/testdata/test1/main.conf @@ -0,0 +1,10 @@ +[tags] + dc = "us-east-1" + +[agent] + interval = "2s" + round_interval = true + flush_interval = "10s" + flush_jitter = "0s" + debug = false + hostname = "" diff --git a/internal/etcd/testdata/test2/main.conf b/internal/etcd/testdata/test2/main.conf new file mode 100644 index 000000000..9e85000fe --- /dev/null +++ b/internal/etcd/testdata/test2/main.conf @@ -0,0 +1,12 @@ +[tags] + dc = "us-east-1" + +[agent] + interval = "2s" + round_interval = true + flush_interval = "10s" + flush_jitter = "0s" + debug = false + hostname = "" + +sdag: sdg