package config import ( "errors" "fmt" "io/ioutil" "log" "path/filepath" "sort" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/config" "github.com/naoina/toml/ast" ) // Config specifies the URL/user/password for the database that telegraf // will be logging to, as well as all the plugins that the user has // specified type Config struct { Tags map[string]string InputFilters []string OutputFilters []string Agent *AgentConfig Inputs []*internal_models.RunningInput Outputs []*internal_models.RunningOutput } func NewConfig() *Config { c := &Config{ // Agent defaults: Agent: &AgentConfig{ Interval: internal.Duration{Duration: 10 * time.Second}, RoundInterval: true, FlushInterval: internal.Duration{Duration: 10 * time.Second}, FlushJitter: internal.Duration{Duration: 5 * time.Second}, }, Tags: make(map[string]string), Inputs: make([]*internal_models.RunningInput, 0), Outputs: make([]*internal_models.RunningOutput, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } return c } type AgentConfig struct { // Interval at which to gather information Interval internal.Duration // RoundInterval rounds collection interval to 'interval'. // ie, if Interval=10s then always collect on :00, :10, :20, etc. RoundInterval bool // CollectionJitter is used to jitter the collection by a random amount. // Each plugin will sleep for a random time within jitter before collecting. // This can be used to avoid many plugins querying things like sysfs at the // same time, which can have a measurable effect on the system. CollectionJitter internal.Duration // FlushInterval is the Interval at which to flush data FlushInterval internal.Duration // FlushJitter Jitters the flush interval by a random amount. // This is primarily to avoid large write spikes for users running a large // number of telegraf instances. // ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s FlushJitter internal.Duration // MetricBufferLimit is the max number of metrics that each output plugin // will cache. The buffer is cleared when a successful write occurs. When // full, the oldest metrics will be overwritten. MetricBufferLimit int // FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever // it fills up, regardless of FlushInterval. Setting this option to true // does _not_ deactivate FlushInterval. FlushBufferWhenFull bool // TODO(cam): Remove UTC and Precision parameters, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability UTC bool `toml:"utc"` Precision string // Debug is the option for running in debug mode Debug bool // Quiet is the option for running in quiet mode Quiet bool Hostname string } // Inputs returns a list of strings of the configured inputs. func (c *Config) InputNames() []string { var name []string for _, input := range c.Inputs { name = append(name, input.Name) } return name } // Outputs returns a list of strings of the configured inputs. func (c *Config) OutputNames() []string { var name []string for _, output := range c.Outputs { name = append(name, output.Name) } return name } // ListTags returns a string of tags specified in the config, // line-protocol style func (c *Config) ListTags() string { var tags []string for k, v := range c.Tags { tags = append(tags, fmt.Sprintf("%s=%s", k, v)) } sort.Strings(tags) return strings.Join(tags, " ") } var header = `# Telegraf Configuration # Telegraf is entirely plugin driven. All metrics are gathered from the # declared inputs, and sent to the declared outputs. # Plugins must be declared in here to be active. # To deactivate a plugin, comment out the name and any variables. # Use 'telegraf -config telegraf.conf -test' to see what metrics a config # file would generate. # Global tags can be specified here in key="value" format. [global_tags] # dc = "us-east-1" # will tag all metrics with dc=us-east-1 # rack = "1a" # Configuration for telegraf agent [agent] ## Default data collection interval for all inputs interval = "10s" ## Rounds collection interval to 'interval' ## ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true ## Telegraf will cache metric_buffer_limit metrics for each output, and will ## flush this buffer on a successful write. metric_buffer_limit = 10000 ## Flush the buffer whenever full, regardless of flush_interval. flush_buffer_when_full = true ## Collection jitter is used to jitter the collection by a random amount. ## Each plugin will sleep for a random time within jitter before collecting. ## This can be used to avoid many plugins querying things like sysfs at the ## same time, which can have a measurable effect on the system. collection_jitter = "0s" ## Default flushing interval for all outputs. You shouldn't set this below ## interval. Maximum flush_interval will be flush_interval + flush_jitter flush_interval = "10s" ## Jitter the flush interval by a random amount. This is primarily to avoid ## large write spikes for users running a large number of telegraf instances. ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" ## Run telegraf in debug mode debug = false ## Run telegraf in quiet mode quiet = false ## Override default hostname, if empty use os.Hostname() hostname = "" ############################################################################### # OUTPUTS # ############################################################################### ` var pluginHeader = ` ############################################################################### # INPUTS # ############################################################################### ` var serviceInputHeader = ` ############################################################################### # SERVICE INPUTS # ############################################################################### ` // PrintSampleConfig prints the sample config func PrintSampleConfig(pluginFilters []string, outputFilters []string) { fmt.Printf(header) // Filter outputs var onames []string for oname := range outputs.Outputs { if len(outputFilters) == 0 || sliceContains(oname, outputFilters) { onames = append(onames, oname) } } sort.Strings(onames) // Print Outputs for _, oname := range onames { creator := outputs.Outputs[oname] output := creator() printConfig(oname, output, "outputs") } // Filter inputs var pnames []string for pname := range inputs.Inputs { if len(pluginFilters) == 0 || sliceContains(pname, pluginFilters) { pnames = append(pnames, pname) } } sort.Strings(pnames) // Print Inputs fmt.Printf(pluginHeader) servInputs := make(map[string]telegraf.ServiceInput) for _, pname := range pnames { creator := inputs.Inputs[pname] input := creator() switch p := input.(type) { case telegraf.ServiceInput: servInputs[pname] = p continue } printConfig(pname, input, "inputs") } // Print Service Inputs fmt.Printf(serviceInputHeader) for name, input := range servInputs { printConfig(name, input, "inputs") } } type printer interface { Description() string SampleConfig() string } func printConfig(name string, p printer, op string) { fmt.Printf("\n# %s\n[[%s.%s]]", p.Description(), op, name) config := p.SampleConfig() if config == "" { fmt.Printf("\n # no configuration\n") } else { fmt.Printf(config) } } func sliceContains(name string, list []string) bool { for _, b := range list { if b == name { return true } } return false } // PrintInputConfig prints the config usage of a single input. func PrintInputConfig(name string) error { if creator, ok := inputs.Inputs[name]; ok { printConfig(name, creator(), "inputs") } else { return errors.New(fmt.Sprintf("Input %s not found", name)) } return nil } // PrintOutputConfig prints the config usage of a single output. func PrintOutputConfig(name string) error { if creator, ok := outputs.Outputs[name]; ok { printConfig(name, creator(), "outputs") } else { return errors.New(fmt.Sprintf("Output %s not found", name)) } return nil } func (c *Config) LoadDirectory(path string) error { directoryEntries, err := ioutil.ReadDir(path) if err != nil { return err } for _, entry := range directoryEntries { if entry.IsDir() { continue } name := entry.Name() if len(name) < 6 || name[len(name)-5:] != ".conf" { continue } err := c.LoadConfig(filepath.Join(path, name)) if err != nil { return err } } return nil } // LoadConfig loads the given config file and applies it to c func (c *Config) LoadConfig(path string) error { tbl, err := config.ParseFile(path) if err != nil { return err } for name, val := range tbl.Fields { subTable, ok := val.(*ast.Table) if !ok { return errors.New("invalid configuration") } switch name { case "agent": if err = config.UnmarshalTable(subTable, c.Agent); err != nil { log.Printf("Could not parse [agent] config\n") return err } case "global_tags", "tags": if err = config.UnmarshalTable(subTable, c.Tags); err != nil { log.Printf("Could not parse [global_tags] config\n") return err } case "outputs": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { case *ast.Table: if err = c.addOutput(pluginName, pluginSubTable); err != nil { return err } case []*ast.Table: for _, t := range pluginSubTable { if err = c.addOutput(pluginName, t); err != nil { return err } } default: return fmt.Errorf("Unsupported config format: %s", pluginName) } } case "inputs", "plugins": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { case *ast.Table: if err = c.addInput(pluginName, pluginSubTable); err != nil { return err } case []*ast.Table: for _, t := range pluginSubTable { if err = c.addInput(pluginName, t); err != nil { return err } } default: return fmt.Errorf("Unsupported config format: %s", pluginName) } } // Assume it's an input input for legacy config file support if no other // identifiers are present default: if err = c.addInput(name, subTable); err != nil { return err } } } return nil } func (c *Config) addOutput(name string, table *ast.Table) error { if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) { return nil } creator, ok := outputs.Outputs[name] if !ok { return fmt.Errorf("Undefined but requested output: %s", name) } output := creator() // If the output has a SetSerializer function, then this means it can write // arbitrary types of output, so build the serializer and set it. switch t := output.(type) { case serializers.SerializerOutput: serializer, err := buildSerializer(name, table) if err != nil { return err } t.SetSerializer(serializer) } outputConfig, err := buildOutput(name, table) if err != nil { return err } if err := config.UnmarshalTable(table, output); err != nil { return err } ro := internal_models.NewRunningOutput(name, output, outputConfig) if c.Agent.MetricBufferLimit > 0 { ro.MetricBufferLimit = c.Agent.MetricBufferLimit } ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull c.Outputs = append(c.Outputs, ro) return nil } func (c *Config) addInput(name string, table *ast.Table) error { if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) { return nil } // Legacy support renaming io input to diskio if name == "io" { name = "diskio" } creator, ok := inputs.Inputs[name] if !ok { return fmt.Errorf("Undefined but requested input: %s", name) } input := creator() // If the input has a SetParser function, then this means it can accept // arbitrary types of input, so build the parser and set it. switch t := input.(type) { case parsers.ParserInput: parser, err := buildParser(name, table) if err != nil { return err } t.SetParser(parser) } pluginConfig, err := buildInput(name, table) if err != nil { return err } if err := config.UnmarshalTable(table, input); err != nil { return err } rp := &internal_models.RunningInput{ Name: name, Input: input, Config: pluginConfig, } c.Inputs = append(c.Inputs, rp) return nil } // buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to // be inserted into the internal_models.OutputConfig/internal_models.InputConfig to be used for prefix // filtering on tags and measurements func buildFilter(tbl *ast.Table) internal_models.Filter { f := internal_models.Filter{} if node, ok := tbl.Fields["pass"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { f.Pass = append(f.Pass, str.Value) f.IsActive = true } } } } } if node, ok := tbl.Fields["drop"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { f.Drop = append(f.Drop, str.Value) f.IsActive = true } } } } } if node, ok := tbl.Fields["tagpass"]; ok { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { if kv, ok := val.(*ast.KeyValue); ok { tagfilter := &internal_models.TagFilter{Name: name} if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { tagfilter.Filter = append(tagfilter.Filter, str.Value) } } } f.TagPass = append(f.TagPass, *tagfilter) f.IsActive = true } } } } if node, ok := tbl.Fields["tagdrop"]; ok { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { if kv, ok := val.(*ast.KeyValue); ok { tagfilter := &internal_models.TagFilter{Name: name} if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { tagfilter.Filter = append(tagfilter.Filter, str.Value) } } } f.TagDrop = append(f.TagDrop, *tagfilter) f.IsActive = true } } } } delete(tbl.Fields, "drop") delete(tbl.Fields, "pass") delete(tbl.Fields, "tagdrop") delete(tbl.Fields, "tagpass") return f } // buildInput parses input specific items from the ast.Table, // builds the filter and returns a // internal_models.InputConfig to be inserted into internal_models.RunningInput func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, error) { cp := &internal_models.InputConfig{Name: name} if node, ok := tbl.Fields["interval"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { dur, err := time.ParseDuration(str.Value) if err != nil { return nil, err } cp.Interval = dur } } } if node, ok := tbl.Fields["name_prefix"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { cp.MeasurementPrefix = str.Value } } } if node, ok := tbl.Fields["name_suffix"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { cp.MeasurementSuffix = str.Value } } } if node, ok := tbl.Fields["name_override"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { cp.NameOverride = str.Value } } } cp.Tags = make(map[string]string) if node, ok := tbl.Fields["tags"]; ok { if subtbl, ok := node.(*ast.Table); ok { if err := config.UnmarshalTable(subtbl, cp.Tags); err != nil { log.Printf("Could not parse tags for input %s\n", name) } } } delete(tbl.Fields, "name_prefix") delete(tbl.Fields, "name_suffix") delete(tbl.Fields, "name_override") delete(tbl.Fields, "interval") delete(tbl.Fields, "tags") cp.Filter = buildFilter(tbl) return cp, nil } // buildParser grabs the necessary entries from the ast.Table for creating // a parsers.Parser object, and creates it, which can then be added onto // an Input object. func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { c := &parsers.Config{} if node, ok := tbl.Fields["data_format"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { c.DataFormat = str.Value } } } // Legacy support, exec plugin originally parsed JSON by default. if name == "exec" && c.DataFormat == "" { c.DataFormat = "json" } else if c.DataFormat == "" { c.DataFormat = "influx" } if node, ok := tbl.Fields["separator"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { c.Separator = str.Value } } } if node, ok := tbl.Fields["templates"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { c.Templates = append(c.Templates, str.Value) } } } } } if node, ok := tbl.Fields["tag_keys"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { c.TagKeys = append(c.TagKeys, str.Value) } } } } } c.MetricName = name delete(tbl.Fields, "data_format") delete(tbl.Fields, "separator") delete(tbl.Fields, "templates") delete(tbl.Fields, "tag_keys") return parsers.NewParser(c) } // buildSerializer grabs the necessary entries from the ast.Table for creating // a serializers.Serializer object, and creates it, which can then be added onto // an Output object. func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) { c := &serializers.Config{} if node, ok := tbl.Fields["data_format"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { c.DataFormat = str.Value } } } if c.DataFormat == "" { c.DataFormat = "influx" } if node, ok := tbl.Fields["prefix"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { c.Prefix = str.Value } } } delete(tbl.Fields, "data_format") delete(tbl.Fields, "prefix") return serializers.NewSerializer(c) } // buildOutput parses output specific items from the ast.Table, builds the filter and returns an // internal_models.OutputConfig to be inserted into internal_models.RunningInput // Note: error exists in the return for future calls that might require error func buildOutput(name string, tbl *ast.Table) (*internal_models.OutputConfig, error) { oc := &internal_models.OutputConfig{ Name: name, Filter: buildFilter(tbl), } return oc, nil }