Refactor the architecture of encoding parsers.

This commit is contained in:
Henry Hu 2016-02-05 20:31:46 +08:00
parent 14f2f36383
commit f0fe6b67af
11 changed files with 259 additions and 248 deletions

View File

@ -158,9 +158,8 @@ Currently implemented sources:
* disque * disque
* docker * docker
* elasticsearch * elasticsearch
* exec (generic line-protocol-emitting executable plugin, support JSON, influx and graphite) * exec (generic executable plugin, support JSON, influx and graphite)
* socket (generic line protocol listen input service, support influx and graphite) * socket (generic line protocol listen input service, support influx and graphite)
* tail (Plugin to tail the files to process line protocol contents, support influx and graphite)
* haproxy * haproxy
* httpjson (generic JSON-emitting http service plugin) * httpjson (generic JSON-emitting http service plugin)
* influxdb * influxdb

View File

@ -0,0 +1,31 @@
package encoding
import (
"fmt"
"github.com/influxdata/telegraf"
)
type Parser interface {
InitConfig(configs map[string]interface{}) error
Parse(buf []byte) ([]telegraf.Metric, error)
ParseLine(line string) (telegraf.Metric, error)
}
type Creator func() Parser
var Parsers = map[string]Creator{}
func Add(name string, creator Creator) {
Parsers[name] = creator
}
func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) {
creator := Parsers[dataFormat]
if creator == nil {
return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat)
}
parser = creator()
err = parser.InitConfig(configs)
return parser, err
}

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/encoding"
) )
// Minimum and maximum supported dates for timestamps. // Minimum and maximum supported dates for timestamps.
@ -22,31 +23,31 @@ var (
MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC) MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC)
) )
var defaultTemplate *template
func init() {
var err error
defaultTemplate, err = NewTemplate("measurement*", nil, DefaultSeparator)
if err != nil {
panic(err)
}
}
// Parser encapsulates a Graphite Parser.
type Parser struct {
matcher *matcher
}
// Options are configurable values that can be provided to a Parser // Options are configurable values that can be provided to a Parser
type Options struct { type Options struct {
Separator string Separator string
Templates []string Templates []string
} }
// NewParserWithOptions returns a graphite parser using the given options // Parser encapsulates a Graphite Parser.
func NewParserWithOptions(options Options) (*Parser, error) { type GraphiteParser struct {
matcher *matcher
}
func NewParser() *GraphiteParser {
return &GraphiteParser{}
}
func (p *GraphiteParser) InitConfig(configs map[string]interface{}) error {
var err error
options := Options{
Templates: configs["Templates"].([]string),
Separator: configs["Separator"].(string)}
matcher := newMatcher() matcher := newMatcher()
p.matcher = matcher
defaultTemplate, _ := NewTemplate("measurement*", nil, DefaultSeparator)
matcher.AddDefaultTemplate(defaultTemplate) matcher.AddDefaultTemplate(defaultTemplate)
for _, pattern := range options.Templates { for _, pattern := range options.Templates {
@ -76,25 +77,29 @@ func NewParserWithOptions(options Options) (*Parser, error) {
} }
} }
tmpl, err := NewTemplate(template, tags, options.Separator) tmpl, err1 := NewTemplate(template, tags, options.Separator)
if err != nil { if err1 != nil {
return nil, err err = err1
break
} }
matcher.Add(filter, tmpl) matcher.Add(filter, tmpl)
} }
return &Parser{matcher: matcher}, nil
if err != nil {
return fmt.Errorf("exec input parser config is error: %s ", err.Error())
} else {
return nil
}
} }
// NewParser returns a GraphiteParser instance. func init() {
func NewParser(templates []string) (*Parser, error) { encoding.Add("graphite", func() encoding.Parser {
return NewParserWithOptions( return NewParser()
Options{
Templates: templates,
Separator: DefaultSeparator,
}) })
} }
func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) { func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
// parse even if the buffer begins with a newline // parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n")) buf = bytes.TrimPrefix(buf, []byte("\n"))
@ -114,7 +119,7 @@ func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) {
// Trim the buffer, even though there should be no padding // Trim the buffer, even though there should be no padding
line := strings.TrimSpace(string(buf)) line := strings.TrimSpace(string(buf))
if metric, err := p.Parse(line); err == nil { if metric, err := p.ParseLine(line); err == nil {
metrics = append(metrics, metric) metrics = append(metrics, metric)
} }
} }
@ -122,7 +127,7 @@ func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) {
} }
// Parse performs Graphite parsing of a single line. // Parse performs Graphite parsing of a single line.
func (p *Parser) Parse(line string) (telegraf.Metric, error) { func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {
// Break into 3 fields (name, value, timestamp). // Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line) fields := strings.Fields(line)
if len(fields) != 2 && len(fields) != 3 { if len(fields) != 2 && len(fields) != 3 {
@ -184,7 +189,7 @@ func (p *Parser) Parse(line string) (telegraf.Metric, error) {
// ApplyTemplate extracts the template fields from the given line and // ApplyTemplate extracts the template fields from the given line and
// returns the measurement name and tags. // returns the measurement name and tags.
func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, error) { func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string, string, error) {
// Break line into fields (name, value, timestamp), only name is used // Break line into fields (name, value, timestamp), only name is used
fields := strings.Fields(line) fields := strings.Fields(line)
if len(fields) == 0 { if len(fields) == 0 {

View File

@ -0,0 +1,48 @@
package influx
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/encoding"
)
type InfluxParser struct {
}
func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics, err := telegraf.ParseMetrics(buf)
if err != nil {
return nil, err
}
return metrics, nil
}
func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line)
}
return metrics[0], nil
}
func NewParser() *InfluxParser {
return &InfluxParser{}
}
func (p *InfluxParser) InitConfig(configs map[string]interface{}) error {
return nil
}
func init() {
encoding.Add("influx", func() encoding.Parser {
return NewParser()
})
}

View File

@ -0,0 +1,68 @@
package json
import (
"encoding/json"
"fmt"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/encoding"
)
type JsonParser struct {
}
func (p *JsonParser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)
var jsonOut interface{}
err := json.Unmarshal(buf, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
return nil, err
}
f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return nil, err
}
metric, err := telegraf.NewMetric("exec", nil, f.Fields, time.Now().UTC())
if err != nil {
return nil, err
}
return append(metrics, metric), nil
}
func (p *JsonParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line)
}
return metrics[0], nil
}
func NewParser() *JsonParser {
return &JsonParser{}
}
func (p *JsonParser) InitConfig(configs map[string]interface{}) error {
return nil
}
func init() {
encoding.Add("json", func() encoding.Parser {
return NewParser()
})
}

View File

@ -1,90 +0,0 @@
package encoding
import (
"encoding/json"
"fmt"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/encoding/graphite"
)
type Parser struct {
graphiteParser *graphite.Parser
}
func NewParser(parser *graphite.Parser) *Parser {
return &Parser{graphiteParser: parser}
}
func (p *Parser) Parse(dataFormat string, out []byte, acc telegraf.Accumulator) error {
var err error
var metrics []telegraf.Metric
var metric telegraf.Metric
switch dataFormat {
case "", "json":
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
break
}
f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
break
}
acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err = telegraf.ParseMetrics(out)
for _, metric = range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
case "graphite":
metrics, err = p.graphiteParser.ParseMetrics(out)
for _, metric = range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
default:
err = fmt.Errorf("Unsupported data format: %s. Must be either json, influx or graphite ", dataFormat)
}
return err
}
func (p *Parser) ParseSocketLines(dataFormat string, buf []byte) ([]telegraf.Metric, error) {
var err error
var metrics []telegraf.Metric
switch dataFormat {
case "", "graphite":
metrics, err = p.graphiteParser.ParseMetrics(buf)
case "influx":
metrics, err = telegraf.ParseMetrics(buf)
default:
err = fmt.Errorf("Unsupported data format: %s. Must be either influx or graphite ", dataFormat)
}
if err != nil {
return nil, err
}
return metrics, nil
}
func (p *Parser) ParseSocketLine(dataFormat, line string) (telegraf.Metric, error) {
metrics, err := p.ParseSocketLines(dataFormat, []byte(line+"\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf("Can not parse the line: %s, for data format: %s ", line, dataFormat)
}
return metrics[0], nil
}

View File

@ -95,8 +95,11 @@ Now let's say we have the following configuration:
``` ```
[[inputs.exec]] [[inputs.exec]]
# the command to run # Shell/commands array
command = "/usr/bin/line_protocol_collector" # compatible with old version
# we can still use the old command configuration
# command = "/usr/bin/line_protocol_collector"
commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"]
# Data format to consume. This can be "json" or "influx" (line-protocol) # Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored. # NOTE json only reads numerical measurements, strings and booleans are ignored.

View File

@ -1,25 +0,0 @@
package exec
import (
"github.com/influxdata/telegraf/internal/encoding/graphite"
)
// Config represents the configuration for Graphite endpoints.
type Config struct {
Commands []string
graphite.Config
}
// New Config instance.
func NewConfig(commands, templates []string, separator string) *Config {
c := &Config{}
if separator == "" {
separator = graphite.DefaultSeparator
}
c.Commands = commands
c.Templates = templates
c.Separator = separator
return c
}

View File

@ -10,8 +10,11 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/encoding" "github.com/influxdata/telegraf/internal/encoding"
"github.com/influxdata/telegraf/internal/encoding/graphite"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/internal/encoding/graphite"
_ "github.com/influxdata/telegraf/internal/encoding/influx"
_ "github.com/influxdata/telegraf/internal/encoding/json"
) )
const sampleConfig = ` const sampleConfig = `
@ -57,9 +60,7 @@ type Exec struct {
Separator string Separator string
Templates []string Templates []string
encodingParser *encoding.Parser encodingParser encoding.Parser
config *Config
initedConfig bool initedConfig bool
@ -107,8 +108,13 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
return return
} }
if err = e.encodingParser.Parse(e.DataFormat, out, acc); err != nil { metrics, err := e.encodingParser.Parse(out)
if err != nil {
e.errc <- err e.errc <- err
} else {
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
} }
} }
@ -120,23 +126,22 @@ func (e *Exec) initConfig() error {
e.Commands = []string{e.Command} e.Commands = []string{e.Command}
} }
c := NewConfig(e.Commands, e.Templates, e.Separator) if e.DataFormat == "" {
if err := c.Validate(); err != nil { e.DataFormat = "json"
return fmt.Errorf("exec configuration is error: %s ", err.Error())
} }
e.config = c
graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{ var err error
Templates: e.config.Templates,
Separator: e.config.Separator}) configs := make(map[string]interface{})
configs["Separator"] = e.Separator
configs["Templates"] = e.Templates
e.encodingParser, err = encoding.NewParser(e.DataFormat, configs)
if err != nil { if err != nil {
return fmt.Errorf("exec input parser config is error: %s ", err.Error()) return fmt.Errorf("exec configuration is error: %s ", err.Error())
} }
e.encodingParser = encoding.NewParser(graphiteParser)
return nil return nil
} }

View File

@ -1,58 +0,0 @@
package socket
import "github.com/influxdata/telegraf/internal/encoding/graphite"
const (
// DefaultBindAddress is the default binding interface if none is specified.
DefaultBindAddress = ":2003"
// DefaultProtocol is the default IP protocol used by the Graphite input.
DefaultProtocol = "tcp"
// DefaultUDPReadBuffer is the default buffer size for the UDP listener.
// Sets the size of the operating system's receive buffer associated with
// the UDP traffic. Keep in mind that the OS must be able
// to handle the number set here or the UDP listener will error and exit.
//
// DefaultReadBuffer = 0 means to use the OS default, which is usually too
// small for high UDP performance.
//
// Increasing OS buffer limits:
// Linux: sudo sysctl -w net.core.rmem_max=<read-buffer>
// BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf=<read-buffer>
DefaultUdpReadBuffer = 0
)
// Config represents the configuration for Graphite endpoints.
type Config struct {
BindAddress string
Protocol string
UdpReadBuffer int
graphite.Config
}
// New Config instance.
func NewConfig(bindAddress, protocol string, udpReadBuffer int, separator string, templates []string) *Config {
c := &Config{}
if bindAddress == "" {
bindAddress = DefaultBindAddress
}
if protocol == "" {
protocol = DefaultProtocol
}
if udpReadBuffer < 0 {
udpReadBuffer = DefaultUdpReadBuffer
}
if separator == "" {
separator = graphite.DefaultSeparator
}
c.BindAddress = bindAddress
c.Protocol = protocol
c.UdpReadBuffer = udpReadBuffer
c.Separator = separator
c.Templates = templates
return c
}

View File

@ -16,9 +16,31 @@ import (
"github.com/influxdata/telegraf/internal/encoding" "github.com/influxdata/telegraf/internal/encoding"
"github.com/influxdata/telegraf/internal/encoding/graphite" "github.com/influxdata/telegraf/internal/encoding/graphite"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/internal/encoding/graphite"
_ "github.com/influxdata/telegraf/internal/encoding/influx"
) )
const ( const (
// DefaultBindAddress is the default binding interface if none is specified.
DefaultBindAddress = ":2003"
// DefaultProtocol is the default IP protocol used by the Graphite input.
DefaultProtocol = "tcp"
// DefaultUDPReadBuffer is the default buffer size for the UDP listener.
// Sets the size of the operating system's receive buffer associated with
// the UDP traffic. Keep in mind that the OS must be able
// to handle the number set here or the UDP listener will error and exit.
//
// DefaultReadBuffer = 0 means to use the OS default, which is usually too
// small for high UDP performance.
//
// Increasing OS buffer limits:
// Linux: sudo sysctl -w net.core.rmem_max=<read-buffer>
// BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf=<read-buffer>
DefaultUdpReadBuffer = 0
udpBufferSize = 65536 udpBufferSize = 65536
) )
@ -44,10 +66,9 @@ type Socket struct {
mu sync.Mutex mu sync.Mutex
encodingParser *encoding.Parser encodingParser encoding.Parser
logger *log.Logger logger *log.Logger
config *Config
tcpConnectionsMu sync.Mutex tcpConnectionsMu sync.Mutex
tcpConnections map[string]*tcpConnection tcpConnections map[string]*tcpConnection
@ -105,39 +126,43 @@ func (s *Socket) Start() error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
c := NewConfig(s.BindAddress, s.Protocol, s.UdpReadBuffer, s.Separator, s.Templates) if s.BindAddress == "" {
s.BindAddress = DefaultBindAddress
if err := c.Validate(); err != nil { }
return fmt.Errorf("Socket input configuration is error: %s ", err.Error()) if s.Protocol == "" {
s.Protocol = DefaultProtocol
}
if s.UdpReadBuffer < 0 {
s.UdpReadBuffer = DefaultUdpReadBuffer
} }
s.config = c
graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{ configs := make(map[string]interface{})
Templates: s.config.Templates, configs["Separator"] = s.Separator
Separator: s.config.Separator}) configs["Templates"] = s.Templates
var err error
s.encodingParser, err = encoding.NewParser(s.DataFormat, configs)
if err != nil { if err != nil {
return fmt.Errorf("Socket input parser config is error: %s ", err.Error()) return fmt.Errorf("Socket input configuration is error: %s ", err.Error())
} }
s.encodingParser = encoding.NewParser(graphiteParser)
s.tcpConnections = make(map[string]*tcpConnection) s.tcpConnections = make(map[string]*tcpConnection)
s.done = make(chan struct{}) s.done = make(chan struct{})
s.metricC = make(chan telegraf.Metric, 50000) s.metricC = make(chan telegraf.Metric, 50000)
if strings.ToLower(s.config.Protocol) == "tcp" { if strings.ToLower(s.Protocol) == "tcp" {
s.addr, err = s.openTCPServer() s.addr, err = s.openTCPServer()
} else if strings.ToLower(s.config.Protocol) == "udp" { } else if strings.ToLower(s.Protocol) == "udp" {
s.addr, err = s.openUDPServer() s.addr, err = s.openUDPServer()
} else { } else {
return fmt.Errorf("unrecognized Socket input protocol %s", s.config.Protocol) return fmt.Errorf("unrecognized Socket input protocol %s", s.Protocol)
} }
if err != nil { if err != nil {
return err return err
} }
s.logger.Printf("Socket Plugin Listening on %s: %s", strings.ToUpper(s.config.Protocol), s.config.BindAddress) s.logger.Printf("Socket Plugin Listening on %s: %s", strings.ToUpper(s.Protocol), s.BindAddress)
return nil return nil
} }
@ -170,7 +195,7 @@ func (s *Socket) Stop() {
// openTCPServer opens the Socket input in TCP mode and starts processing data. // openTCPServer opens the Socket input in TCP mode and starts processing data.
func (s *Socket) openTCPServer() (net.Addr, error) { func (s *Socket) openTCPServer() (net.Addr, error) {
ln, err := net.Listen("tcp", s.config.BindAddress) ln, err := net.Listen("tcp", s.BindAddress)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -235,7 +260,7 @@ func (s *Socket) untrackConnection(c net.Conn) {
// openUDPServer opens the Socket input in UDP mode and starts processing incoming data. // openUDPServer opens the Socket input in UDP mode and starts processing incoming data.
func (s *Socket) openUDPServer() (net.Addr, error) { func (s *Socket) openUDPServer() (net.Addr, error) {
addr, err := net.ResolveUDPAddr("udp", s.config.BindAddress) addr, err := net.ResolveUDPAddr("udp", s.BindAddress)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -245,11 +270,11 @@ func (s *Socket) openUDPServer() (net.Addr, error) {
return nil, err return nil, err
} }
if s.config.UdpReadBuffer != 0 { if s.UdpReadBuffer != 0 {
err = s.udpConn.SetReadBuffer(s.config.UdpReadBuffer) err = s.udpConn.SetReadBuffer(s.UdpReadBuffer)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to set UDP read buffer to %d: %s", return nil, fmt.Errorf("unable to set UDP read buffer to %d: %s",
s.config.UdpReadBuffer, err) s.UdpReadBuffer, err)
} }
} }
@ -276,7 +301,7 @@ func (s *Socket) handleLines(buf []byte) {
} }
// Parse it. // Parse it.
metrics, err := s.encodingParser.ParseSocketLines(s.DataFormat, buf) metrics, err := s.encodingParser.Parse(buf)
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case *graphite.UnsupposedValueError: case *graphite.UnsupposedValueError: