telegraf/internal/config/config.go

836 lines
22 KiB
Go

package config
import (
"errors"
"fmt"
"io/ioutil"
"log"
"path/filepath"
"sort"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/config"
"github.com/influxdata/toml/ast"
)
var (
// Default input plugins
inputDefaults = []string{"cpu", "mem", "swap", "system", "kernel",
"processes", "disk", "diskio"}
// Default output plugins
outputDefaults = []string{"influxdb"}
)
// Config specifies the URL/user/password for the database that telegraf
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
Tags map[string]string
InputFilters []string
OutputFilters []string
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
}
func NewConfig() *Config {
c := &Config{
// Agent defaults:
Agent: &AgentConfig{
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second},
FlushJitter: internal.Duration{Duration: 5 * time.Second},
},
Tags: make(map[string]string),
Inputs: make([]*internal_models.RunningInput, 0),
Outputs: make([]*internal_models.RunningOutput, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
return c
}
type AgentConfig struct {
// Interval at which to gather information
Interval internal.Duration
// RoundInterval rounds collection interval to 'interval'.
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool
// CollectionJitter is used to jitter the collection by a random amount.
// Each plugin will sleep for a random time within jitter before collecting.
// This can be used to avoid many plugins querying things like sysfs at the
// same time, which can have a measurable effect on the system.
CollectionJitter internal.Duration
// FlushInterval is the Interval at which to flush data
FlushInterval internal.Duration
// FlushJitter Jitters the flush interval by a random amount.
// This is primarily to avoid large write spikes for users running a large
// number of telegraf instances.
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
FlushJitter internal.Duration
// MetricBufferLimit is the max number of metrics that each output plugin
// will cache. The buffer is cleared when a successful write occurs. When
// full, the oldest metrics will be overwritten.
MetricBufferLimit int
// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
// it fills up, regardless of FlushInterval. Setting this option to true
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool
// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
UTC bool `toml:"utc"`
Precision string
// Debug is the option for running in debug mode
Debug bool
// Quiet is the option for running in quiet mode
Quiet bool
Hostname string
OmitHostname bool
}
// Inputs returns a list of strings of the configured inputs.
func (c *Config) InputNames() []string {
var name []string
for _, input := range c.Inputs {
name = append(name, input.Name)
}
return name
}
// Outputs returns a list of strings of the configured inputs.
func (c *Config) OutputNames() []string {
var name []string
for _, output := range c.Outputs {
name = append(name, output.Name)
}
return name
}
// ListTags returns a string of tags specified in the config,
// line-protocol style
func (c *Config) ListTags() string {
var tags []string
for k, v := range c.Tags {
tags = append(tags, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(tags)
return strings.Join(tags, " ")
}
var header = `# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write.
metric_buffer_limit = 1000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Run telegraf in debug mode
debug = false
## Run telegraf in quiet mode
quiet = false
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
`
var inputHeader = `
###############################################################################
# INPUT PLUGINS #
###############################################################################
`
var serviceInputHeader = `
###############################################################################
# SERVICE INPUT PLUGINS #
###############################################################################
`
// PrintSampleConfig prints the sample config
func PrintSampleConfig(inputFilters []string, outputFilters []string) {
fmt.Printf(header)
if len(outputFilters) != 0 {
printFilteredOutputs(outputFilters, false)
} else {
printFilteredOutputs(outputDefaults, false)
// Print non-default outputs, commented
var pnames []string
for pname := range outputs.Outputs {
if !sliceContains(pname, outputDefaults) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
printFilteredOutputs(pnames, true)
}
fmt.Printf(inputHeader)
if len(inputFilters) != 0 {
printFilteredInputs(inputFilters, false)
} else {
printFilteredInputs(inputDefaults, false)
// Print non-default inputs, commented
var pnames []string
for pname := range inputs.Inputs {
if !sliceContains(pname, inputDefaults) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
printFilteredInputs(pnames, true)
}
}
func printFilteredInputs(inputFilters []string, commented bool) {
// Filter inputs
var pnames []string
for pname := range inputs.Inputs {
if sliceContains(pname, inputFilters) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
// Print Inputs
servInputs := make(map[string]telegraf.ServiceInput)
for _, pname := range pnames {
creator := inputs.Inputs[pname]
input := creator()
switch p := input.(type) {
case telegraf.ServiceInput:
servInputs[pname] = p
continue
}
printConfig(pname, input, "inputs", commented)
}
// Print Service Inputs
if len(servInputs) == 0 {
return
}
fmt.Printf(serviceInputHeader)
for name, input := range servInputs {
printConfig(name, input, "inputs", commented)
}
}
func printFilteredOutputs(outputFilters []string, commented bool) {
// Filter outputs
var onames []string
for oname := range outputs.Outputs {
if sliceContains(oname, outputFilters) {
onames = append(onames, oname)
}
}
sort.Strings(onames)
// Print Outputs
for _, oname := range onames {
creator := outputs.Outputs[oname]
output := creator()
printConfig(oname, output, "outputs", commented)
}
}
type printer interface {
Description() string
SampleConfig() string
}
func printConfig(name string, p printer, op string, commented bool) {
comment := ""
if commented {
comment = "# "
}
fmt.Printf("\n%s# %s\n%s[[%s.%s]]", comment, p.Description(), comment,
op, name)
config := p.SampleConfig()
if config == "" {
fmt.Printf("\n%s # no configuration\n\n", comment)
} else {
lines := strings.Split(config, "\n")
for i, line := range lines {
if i == 0 || i == len(lines)-1 {
fmt.Print("\n")
continue
}
fmt.Print(comment + line + "\n")
}
}
}
func sliceContains(name string, list []string) bool {
for _, b := range list {
if b == name {
return true
}
}
return false
}
// PrintInputConfig prints the config usage of a single input.
func PrintInputConfig(name string) error {
if creator, ok := inputs.Inputs[name]; ok {
printConfig(name, creator(), "inputs", false)
} else {
return errors.New(fmt.Sprintf("Input %s not found", name))
}
return nil
}
// PrintOutputConfig prints the config usage of a single output.
func PrintOutputConfig(name string) error {
if creator, ok := outputs.Outputs[name]; ok {
printConfig(name, creator(), "outputs", false)
} else {
return errors.New(fmt.Sprintf("Output %s not found", name))
}
return nil
}
func (c *Config) LoadDirectory(path string) error {
directoryEntries, err := ioutil.ReadDir(path)
if err != nil {
return err
}
for _, entry := range directoryEntries {
if entry.IsDir() {
continue
}
name := entry.Name()
if len(name) < 6 || name[len(name)-5:] != ".conf" {
continue
}
err := c.LoadConfig(filepath.Join(path, name))
if err != nil {
return err
}
}
return nil
}
// LoadConfig loads the given config file and applies it to c
func (c *Config) LoadConfig(path string) error {
tbl, err := config.ParseFile(path)
if err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
for name, val := range tbl.Fields {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("%s: invalid configuration", path)
}
switch name {
case "agent":
if err = config.UnmarshalTable(subTable, c.Agent); err != nil {
log.Printf("Could not parse [agent] config\n")
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case "global_tags", "tags":
if err = config.UnmarshalTable(subTable, c.Tags); err != nil {
log.Printf("Could not parse [global_tags] config\n")
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case "outputs":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addOutput(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}
case "inputs", "plugins":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
if err = c.addInput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addInput(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}
// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
if err = c.addInput(name, subTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
}
return nil
}
func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
}
creator, ok := outputs.Outputs[name]
if !ok {
return fmt.Errorf("Undefined but requested output: %s", name)
}
output := creator()
// If the output has a SetSerializer function, then this means it can write
// arbitrary types of output, so build the serializer and set it.
switch t := output.(type) {
case serializers.SerializerOutput:
serializer, err := buildSerializer(name, table)
if err != nil {
return err
}
t.SetSerializer(serializer)
}
outputConfig, err := buildOutput(name, table)
if err != nil {
return err
}
if err := config.UnmarshalTable(table, output); err != nil {
return err
}
ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBufferLimit > 0 {
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
}
ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
c.Outputs = append(c.Outputs, ro)
return nil
}
func (c *Config) addInput(name string, table *ast.Table) error {
if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) {
return nil
}
// Legacy support renaming io input to diskio
if name == "io" {
name = "diskio"
}
creator, ok := inputs.Inputs[name]
if !ok {
return fmt.Errorf("Undefined but requested input: %s", name)
}
input := creator()
// If the input has a SetParser function, then this means it can accept
// arbitrary types of input, so build the parser and set it.
switch t := input.(type) {
case parsers.ParserInput:
parser, err := buildParser(name, table)
if err != nil {
return err
}
t.SetParser(parser)
}
pluginConfig, err := buildInput(name, table)
if err != nil {
return err
}
if err := config.UnmarshalTable(table, input); err != nil {
return err
}
rp := &internal_models.RunningInput{
Name: name,
Input: input,
Config: pluginConfig,
}
c.Inputs = append(c.Inputs, rp)
return nil
}
// buildFilter builds a Filter
// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
// be inserted into the internal_models.OutputConfig/internal_models.InputConfig to be used for prefix
// filtering on tags and measurements
func buildFilter(tbl *ast.Table) internal_models.Filter {
f := internal_models.Filter{}
if node, ok := tbl.Fields["namepass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.NamePass = append(f.NamePass, str.Value)
f.IsActive = true
}
}
}
}
}
if node, ok := tbl.Fields["namedrop"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.NameDrop = append(f.NameDrop, str.Value)
f.IsActive = true
}
}
}
}
}
fields := []string{"pass", "fieldpass"}
for _, field := range fields {
if node, ok := tbl.Fields[field]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.FieldPass = append(f.FieldPass, str.Value)
f.IsActive = true
}
}
}
}
}
}
fields = []string{"drop", "fielddrop"}
for _, field := range fields {
if node, ok := tbl.Fields[field]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.FieldDrop = append(f.FieldDrop, str.Value)
f.IsActive = true
}
}
}
}
}
}
if node, ok := tbl.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &internal_models.TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
f.TagPass = append(f.TagPass, *tagfilter)
f.IsActive = true
}
}
}
}
if node, ok := tbl.Fields["tagdrop"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &internal_models.TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
f.TagDrop = append(f.TagDrop, *tagfilter)
f.IsActive = true
}
}
}
}
delete(tbl.Fields, "namedrop")
delete(tbl.Fields, "namepass")
delete(tbl.Fields, "fielddrop")
delete(tbl.Fields, "fieldpass")
delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass")
delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass")
return f
}
// buildInput parses input specific items from the ast.Table,
// builds the filter and returns a
// internal_models.InputConfig to be inserted into internal_models.RunningInput
func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, error) {
cp := &internal_models.InputConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
cp.Interval = dur
}
}
}
if node, ok := tbl.Fields["name_prefix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.MeasurementPrefix = str.Value
}
}
}
if node, ok := tbl.Fields["name_suffix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.MeasurementSuffix = str.Value
}
}
}
if node, ok := tbl.Fields["name_override"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.NameOverride = str.Value
}
}
}
cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
if err := config.UnmarshalTable(subtbl, cp.Tags); err != nil {
log.Printf("Could not parse tags for input %s\n", name)
}
}
}
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
cp.Filter = buildFilter(tbl)
return cp, nil
}
// buildParser grabs the necessary entries from the ast.Table for creating
// a parsers.Parser object, and creates it, which can then be added onto
// an Input object.
func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
c := &parsers.Config{}
if node, ok := tbl.Fields["data_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.DataFormat = str.Value
}
}
}
// Legacy support, exec plugin originally parsed JSON by default.
if name == "exec" && c.DataFormat == "" {
c.DataFormat = "json"
} else if c.DataFormat == "" {
c.DataFormat = "influx"
}
if node, ok := tbl.Fields["separator"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.Separator = str.Value
}
}
}
if node, ok := tbl.Fields["templates"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.Templates = append(c.Templates, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["tag_keys"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.TagKeys = append(c.TagKeys, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.DataType = str.Value
}
}
}
c.MetricName = name
delete(tbl.Fields, "data_format")
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "data_type")
return parsers.NewParser(c)
}
// buildSerializer grabs the necessary entries from the ast.Table for creating
// a serializers.Serializer object, and creates it, which can then be added onto
// an Output object.
func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) {
c := &serializers.Config{}
if node, ok := tbl.Fields["data_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.DataFormat = str.Value
}
}
}
if c.DataFormat == "" {
c.DataFormat = "influx"
}
if node, ok := tbl.Fields["prefix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.Prefix = str.Value
}
}
}
delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix")
return serializers.NewSerializer(c)
}
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
// internal_models.OutputConfig to be inserted into internal_models.RunningInput
// Note: error exists in the return for future calls that might require error
func buildOutput(name string, tbl *ast.Table) (*internal_models.OutputConfig, error) {
oc := &internal_models.OutputConfig{
Name: name,
Filter: buildFilter(tbl),
}
// Outputs don't support FieldDrop/FieldPass, so set to NameDrop/NamePass
if len(oc.Filter.FieldDrop) > 0 {
oc.Filter.NameDrop = oc.Filter.FieldDrop
}
if len(oc.Filter.FieldPass) > 0 {
oc.Filter.NamePass = oc.Filter.FieldPass
}
return oc, nil
}