diff --git a/README.md b/README.md index 491de26c8..b44a78a6d 100644 --- a/README.md +++ b/README.md @@ -158,9 +158,7 @@ Currently implemented sources: * disque * docker * elasticsearch -* exec (generic line-protocol-emitting executable plugin, support JSON, influx and graphite) -* socket (generic line protocol listen input service, support influx and graphite) -* tail (Plugin to tail the files to process line protocol contents, support influx and graphite) +* exec (generic executable plugin, support JSON, influx and graphite) * haproxy * httpjson (generic JSON-emitting http service plugin) * influxdb diff --git a/internal/encoding/encoder.go b/internal/encoding/encoder.go new file mode 100644 index 000000000..129906ce5 --- /dev/null +++ b/internal/encoding/encoder.go @@ -0,0 +1,31 @@ +package encoding + +import ( + "fmt" + + "github.com/influxdata/telegraf" +) + +type Parser interface { + InitConfig(configs map[string]interface{}) error + Parse(buf []byte) ([]telegraf.Metric, error) + ParseLine(line string) (telegraf.Metric, error) +} + +type Creator func() Parser + +var Parsers = map[string]Creator{} + +func Add(name string, creator Creator) { + Parsers[name] = creator +} + +func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) { + creator := Parsers[dataFormat] + if creator == nil { + return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat) + } + parser = creator() + err = parser.InitConfig(configs) + return parser, err +} diff --git a/internal/encoding/graphite/parser.go b/internal/encoding/graphite/parser.go index a86397b80..f43c76b87 100644 --- a/internal/encoding/graphite/parser.go +++ b/internal/encoding/graphite/parser.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/encoding" ) // Minimum and maximum supported dates for timestamps. @@ -22,31 +23,31 @@ var ( MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC) ) -var defaultTemplate *template - -func init() { - var err error - defaultTemplate, err = NewTemplate("measurement*", nil, DefaultSeparator) - if err != nil { - panic(err) - } -} - -// Parser encapsulates a Graphite Parser. -type Parser struct { - matcher *matcher -} - // Options are configurable values that can be provided to a Parser type Options struct { Separator string Templates []string } -// NewParserWithOptions returns a graphite parser using the given options -func NewParserWithOptions(options Options) (*Parser, error) { +// Parser encapsulates a Graphite Parser. +type GraphiteParser struct { + matcher *matcher +} + +func NewParser() *GraphiteParser { + return &GraphiteParser{} +} + +func (p *GraphiteParser) InitConfig(configs map[string]interface{}) error { + + var err error + options := Options{ + Templates: configs["Templates"].([]string), + Separator: configs["Separator"].(string)} matcher := newMatcher() + p.matcher = matcher + defaultTemplate, _ := NewTemplate("measurement*", nil, DefaultSeparator) matcher.AddDefaultTemplate(defaultTemplate) for _, pattern := range options.Templates { @@ -76,25 +77,29 @@ func NewParserWithOptions(options Options) (*Parser, error) { } } - tmpl, err := NewTemplate(template, tags, options.Separator) - if err != nil { - return nil, err + tmpl, err1 := NewTemplate(template, tags, options.Separator) + if err1 != nil { + err = err1 + break } matcher.Add(filter, tmpl) } - return &Parser{matcher: matcher}, nil + + if err != nil { + return fmt.Errorf("exec input parser config is error: %s ", err.Error()) + } else { + return nil + } + } -// NewParser returns a GraphiteParser instance. -func NewParser(templates []string) (*Parser, error) { - return NewParserWithOptions( - Options{ - Templates: templates, - Separator: DefaultSeparator, - }) +func init() { + encoding.Add("graphite", func() encoding.Parser { + return NewParser() + }) } -func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) { +func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) { // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) @@ -114,7 +119,7 @@ func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) { // Trim the buffer, even though there should be no padding line := strings.TrimSpace(string(buf)) - if metric, err := p.Parse(line); err == nil { + if metric, err := p.ParseLine(line); err == nil { metrics = append(metrics, metric) } } @@ -122,7 +127,7 @@ func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) { } // Parse performs Graphite parsing of a single line. -func (p *Parser) Parse(line string) (telegraf.Metric, error) { +func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) { // Break into 3 fields (name, value, timestamp). fields := strings.Fields(line) if len(fields) != 2 && len(fields) != 3 { @@ -184,7 +189,7 @@ func (p *Parser) Parse(line string) (telegraf.Metric, error) { // ApplyTemplate extracts the template fields from the given line and // returns the measurement name and tags. -func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, error) { +func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string, string, error) { // Break line into fields (name, value, timestamp), only name is used fields := strings.Fields(line) if len(fields) == 0 { diff --git a/internal/encoding/influx/parser.go b/internal/encoding/influx/parser.go new file mode 100644 index 000000000..7965b69e2 --- /dev/null +++ b/internal/encoding/influx/parser.go @@ -0,0 +1,48 @@ +package influx + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/encoding" +) + +type InfluxParser struct { +} + +func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { + metrics, err := telegraf.ParseMetrics(buf) + + if err != nil { + return nil, err + } + return metrics, nil +} + +func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line + "\n")) + + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line) + } + + return metrics[0], nil +} + +func NewParser() *InfluxParser { + return &InfluxParser{} +} + +func (p *InfluxParser) InitConfig(configs map[string]interface{}) error { + return nil +} + +func init() { + encoding.Add("influx", func() encoding.Parser { + return NewParser() + }) +} diff --git a/internal/encoding/json/parser.go b/internal/encoding/json/parser.go new file mode 100644 index 000000000..69a91d14d --- /dev/null +++ b/internal/encoding/json/parser.go @@ -0,0 +1,68 @@ +package json + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/influxdata/telegraf" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/encoding" +) + +type JsonParser struct { +} + +func (p *JsonParser) Parse(buf []byte) ([]telegraf.Metric, error) { + + metrics := make([]telegraf.Metric, 0) + + var jsonOut interface{} + err := json.Unmarshal(buf, &jsonOut) + if err != nil { + err = fmt.Errorf("unable to parse out as JSON, %s", err) + return nil, err + } + + f := internal.JSONFlattener{} + err = f.FlattenJSON("", jsonOut) + if err != nil { + return nil, err + } + + metric, err := telegraf.NewMetric("exec", nil, f.Fields, time.Now().UTC()) + + if err != nil { + return nil, err + } + return append(metrics, metric), nil +} + +func (p *JsonParser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line + "\n")) + + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line) + } + + return metrics[0], nil +} + +func NewParser() *JsonParser { + return &JsonParser{} +} + +func (p *JsonParser) InitConfig(configs map[string]interface{}) error { + return nil +} + +func init() { + encoding.Add("json", func() encoding.Parser { + return NewParser() + }) +} diff --git a/internal/encoding/parser.go b/internal/encoding/parser.go deleted file mode 100644 index dc73bfe68..000000000 --- a/internal/encoding/parser.go +++ /dev/null @@ -1,90 +0,0 @@ -package encoding - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/encoding/graphite" -) - -type Parser struct { - graphiteParser *graphite.Parser -} - -func NewParser(parser *graphite.Parser) *Parser { - return &Parser{graphiteParser: parser} -} - -func (p *Parser) Parse(dataFormat string, out []byte, acc telegraf.Accumulator) error { - var err error - var metrics []telegraf.Metric - var metric telegraf.Metric - - switch dataFormat { - case "", "json": - var jsonOut interface{} - err = json.Unmarshal(out, &jsonOut) - if err != nil { - err = fmt.Errorf("unable to parse out as JSON, %s", err) - break - } - - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { - break - } - acc.AddFields("exec", f.Fields, nil) - case "influx": - now := time.Now() - metrics, err = telegraf.ParseMetrics(out) - for _, metric = range metrics { - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now) - } - case "graphite": - metrics, err = p.graphiteParser.ParseMetrics(out) - for _, metric = range metrics { - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) - } - default: - err = fmt.Errorf("Unsupported data format: %s. Must be either json, influx or graphite ", dataFormat) - } - - return err -} - -func (p *Parser) ParseSocketLines(dataFormat string, buf []byte) ([]telegraf.Metric, error) { - var err error - var metrics []telegraf.Metric - - switch dataFormat { - case "", "graphite": - metrics, err = p.graphiteParser.ParseMetrics(buf) - case "influx": - metrics, err = telegraf.ParseMetrics(buf) - default: - err = fmt.Errorf("Unsupported data format: %s. Must be either influx or graphite ", dataFormat) - } - - if err != nil { - return nil, err - } - return metrics, nil -} - -func (p *Parser) ParseSocketLine(dataFormat, line string) (telegraf.Metric, error) { - metrics, err := p.ParseSocketLines(dataFormat, []byte(line+"\n")) - - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf("Can not parse the line: %s, for data format: %s ", line, dataFormat) - } - - return metrics[0], nil -} diff --git a/plugins/inputs/exec/README.md b/plugins/inputs/exec/README.md index a58b3dde6..4b862b273 100644 --- a/plugins/inputs/exec/README.md +++ b/plugins/inputs/exec/README.md @@ -95,8 +95,11 @@ Now let's say we have the following configuration: ``` [[inputs.exec]] - # the command to run - command = "/usr/bin/line_protocol_collector" + # Shell/commands array + # compatible with old version + # we can still use the old command configuration + # command = "/usr/bin/line_protocol_collector" + commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"] # Data format to consume. This can be "json" or "influx" (line-protocol) # NOTE json only reads numerical measurements, strings and booleans are ignored. diff --git a/plugins/inputs/exec/config.go b/plugins/inputs/exec/config.go deleted file mode 100644 index c4ad5d47f..000000000 --- a/plugins/inputs/exec/config.go +++ /dev/null @@ -1,25 +0,0 @@ -package exec - -import ( - "github.com/influxdata/telegraf/internal/encoding/graphite" -) - -// Config represents the configuration for Graphite endpoints. -type Config struct { - Commands []string - graphite.Config -} - -// New Config instance. -func NewConfig(commands, templates []string, separator string) *Config { - c := &Config{} - if separator == "" { - separator = graphite.DefaultSeparator - } - - c.Commands = commands - c.Templates = templates - c.Separator = separator - - return c -} diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index efccc7033..a53a6f32d 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -10,8 +10,11 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/encoding" - "github.com/influxdata/telegraf/internal/encoding/graphite" "github.com/influxdata/telegraf/plugins/inputs" + + _ "github.com/influxdata/telegraf/internal/encoding/graphite" + _ "github.com/influxdata/telegraf/internal/encoding/influx" + _ "github.com/influxdata/telegraf/internal/encoding/json" ) const sampleConfig = ` @@ -57,9 +60,7 @@ type Exec struct { Separator string Templates []string - encodingParser *encoding.Parser - - config *Config + encodingParser encoding.Parser initedConfig bool @@ -107,8 +108,13 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { return } - if err = e.encodingParser.Parse(e.DataFormat, out, acc); err != nil { + metrics, err := e.encodingParser.Parse(out) + if err != nil { e.errc <- err + } else { + for _, metric := range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } } } @@ -120,23 +126,22 @@ func (e *Exec) initConfig() error { e.Commands = []string{e.Command} } - c := NewConfig(e.Commands, e.Templates, e.Separator) - if err := c.Validate(); err != nil { - return fmt.Errorf("exec configuration is error: %s ", err.Error()) - + if e.DataFormat == "" { + e.DataFormat = "json" } - e.config = c - graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{ - Templates: e.config.Templates, - Separator: e.config.Separator}) + var err error + + configs := make(map[string]interface{}) + configs["Separator"] = e.Separator + configs["Templates"] = e.Templates + + e.encodingParser, err = encoding.NewParser(e.DataFormat, configs) if err != nil { - return fmt.Errorf("exec input parser config is error: %s ", err.Error()) + return fmt.Errorf("exec configuration is error: %s ", err.Error()) } - e.encodingParser = encoding.NewParser(graphiteParser) - return nil }