From f0fe6b67af317aa621f1a4a32fbc3b7df0f5436d Mon Sep 17 00:00:00 2001 From: Henry Hu Date: Fri, 5 Feb 2016 20:31:46 +0800 Subject: [PATCH] Refactor the architecture of encoding parsers. --- README.md | 3 +- internal/encoding/encoder.go | 31 ++++++++++ internal/encoding/graphite/parser.go | 69 +++++++++++---------- internal/encoding/influx/parser.go | 48 +++++++++++++++ internal/encoding/json/parser.go | 68 +++++++++++++++++++++ internal/encoding/parser.go | 90 ---------------------------- plugins/inputs/exec/README.md | 7 ++- plugins/inputs/exec/config.go | 25 -------- plugins/inputs/exec/exec.go | 37 +++++++----- plugins/inputs/socket/config.go | 58 ------------------ plugins/inputs/socket/socket.go | 71 +++++++++++++++------- 11 files changed, 259 insertions(+), 248 deletions(-) create mode 100644 internal/encoding/encoder.go create mode 100644 internal/encoding/influx/parser.go create mode 100644 internal/encoding/json/parser.go delete mode 100644 internal/encoding/parser.go delete mode 100644 plugins/inputs/exec/config.go delete mode 100644 plugins/inputs/socket/config.go diff --git a/README.md b/README.md index 491de26c8..947408815 100644 --- a/README.md +++ b/README.md @@ -158,9 +158,8 @@ Currently implemented sources: * disque * docker * elasticsearch -* exec (generic line-protocol-emitting executable plugin, support JSON, influx and graphite) +* exec (generic executable plugin, support JSON, influx and graphite) * socket (generic line protocol listen input service, support influx and graphite) -* tail (Plugin to tail the files to process line protocol contents, support influx and graphite) * haproxy * httpjson (generic JSON-emitting http service plugin) * influxdb diff --git a/internal/encoding/encoder.go b/internal/encoding/encoder.go new file mode 100644 index 000000000..129906ce5 --- /dev/null +++ b/internal/encoding/encoder.go @@ -0,0 +1,31 @@ +package encoding + +import ( + "fmt" + + "github.com/influxdata/telegraf" +) + +type Parser interface { + InitConfig(configs map[string]interface{}) error + Parse(buf []byte) ([]telegraf.Metric, error) + ParseLine(line string) (telegraf.Metric, error) +} + +type Creator func() Parser + +var Parsers = map[string]Creator{} + +func Add(name string, creator Creator) { + Parsers[name] = creator +} + +func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) { + creator := Parsers[dataFormat] + if creator == nil { + return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat) + } + parser = creator() + err = parser.InitConfig(configs) + return parser, err +} diff --git a/internal/encoding/graphite/parser.go b/internal/encoding/graphite/parser.go index a86397b80..f43c76b87 100644 --- a/internal/encoding/graphite/parser.go +++ b/internal/encoding/graphite/parser.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/encoding" ) // Minimum and maximum supported dates for timestamps. @@ -22,31 +23,31 @@ var ( MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC) ) -var defaultTemplate *template - -func init() { - var err error - defaultTemplate, err = NewTemplate("measurement*", nil, DefaultSeparator) - if err != nil { - panic(err) - } -} - -// Parser encapsulates a Graphite Parser. -type Parser struct { - matcher *matcher -} - // Options are configurable values that can be provided to a Parser type Options struct { Separator string Templates []string } -// NewParserWithOptions returns a graphite parser using the given options -func NewParserWithOptions(options Options) (*Parser, error) { +// Parser encapsulates a Graphite Parser. +type GraphiteParser struct { + matcher *matcher +} + +func NewParser() *GraphiteParser { + return &GraphiteParser{} +} + +func (p *GraphiteParser) InitConfig(configs map[string]interface{}) error { + + var err error + options := Options{ + Templates: configs["Templates"].([]string), + Separator: configs["Separator"].(string)} matcher := newMatcher() + p.matcher = matcher + defaultTemplate, _ := NewTemplate("measurement*", nil, DefaultSeparator) matcher.AddDefaultTemplate(defaultTemplate) for _, pattern := range options.Templates { @@ -76,25 +77,29 @@ func NewParserWithOptions(options Options) (*Parser, error) { } } - tmpl, err := NewTemplate(template, tags, options.Separator) - if err != nil { - return nil, err + tmpl, err1 := NewTemplate(template, tags, options.Separator) + if err1 != nil { + err = err1 + break } matcher.Add(filter, tmpl) } - return &Parser{matcher: matcher}, nil + + if err != nil { + return fmt.Errorf("exec input parser config is error: %s ", err.Error()) + } else { + return nil + } + } -// NewParser returns a GraphiteParser instance. -func NewParser(templates []string) (*Parser, error) { - return NewParserWithOptions( - Options{ - Templates: templates, - Separator: DefaultSeparator, - }) +func init() { + encoding.Add("graphite", func() encoding.Parser { + return NewParser() + }) } -func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) { +func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) { // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) @@ -114,7 +119,7 @@ func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) { // Trim the buffer, even though there should be no padding line := strings.TrimSpace(string(buf)) - if metric, err := p.Parse(line); err == nil { + if metric, err := p.ParseLine(line); err == nil { metrics = append(metrics, metric) } } @@ -122,7 +127,7 @@ func (p *Parser) ParseMetrics(buf []byte) ([]telegraf.Metric, error) { } // Parse performs Graphite parsing of a single line. -func (p *Parser) Parse(line string) (telegraf.Metric, error) { +func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) { // Break into 3 fields (name, value, timestamp). fields := strings.Fields(line) if len(fields) != 2 && len(fields) != 3 { @@ -184,7 +189,7 @@ func (p *Parser) Parse(line string) (telegraf.Metric, error) { // ApplyTemplate extracts the template fields from the given line and // returns the measurement name and tags. -func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, error) { +func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string, string, error) { // Break line into fields (name, value, timestamp), only name is used fields := strings.Fields(line) if len(fields) == 0 { diff --git a/internal/encoding/influx/parser.go b/internal/encoding/influx/parser.go new file mode 100644 index 000000000..7965b69e2 --- /dev/null +++ b/internal/encoding/influx/parser.go @@ -0,0 +1,48 @@ +package influx + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/encoding" +) + +type InfluxParser struct { +} + +func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { + metrics, err := telegraf.ParseMetrics(buf) + + if err != nil { + return nil, err + } + return metrics, nil +} + +func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line + "\n")) + + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line) + } + + return metrics[0], nil +} + +func NewParser() *InfluxParser { + return &InfluxParser{} +} + +func (p *InfluxParser) InitConfig(configs map[string]interface{}) error { + return nil +} + +func init() { + encoding.Add("influx", func() encoding.Parser { + return NewParser() + }) +} diff --git a/internal/encoding/json/parser.go b/internal/encoding/json/parser.go new file mode 100644 index 000000000..69a91d14d --- /dev/null +++ b/internal/encoding/json/parser.go @@ -0,0 +1,68 @@ +package json + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/influxdata/telegraf" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/encoding" +) + +type JsonParser struct { +} + +func (p *JsonParser) Parse(buf []byte) ([]telegraf.Metric, error) { + + metrics := make([]telegraf.Metric, 0) + + var jsonOut interface{} + err := json.Unmarshal(buf, &jsonOut) + if err != nil { + err = fmt.Errorf("unable to parse out as JSON, %s", err) + return nil, err + } + + f := internal.JSONFlattener{} + err = f.FlattenJSON("", jsonOut) + if err != nil { + return nil, err + } + + metric, err := telegraf.NewMetric("exec", nil, f.Fields, time.Now().UTC()) + + if err != nil { + return nil, err + } + return append(metrics, metric), nil +} + +func (p *JsonParser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line + "\n")) + + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line) + } + + return metrics[0], nil +} + +func NewParser() *JsonParser { + return &JsonParser{} +} + +func (p *JsonParser) InitConfig(configs map[string]interface{}) error { + return nil +} + +func init() { + encoding.Add("json", func() encoding.Parser { + return NewParser() + }) +} diff --git a/internal/encoding/parser.go b/internal/encoding/parser.go deleted file mode 100644 index dc73bfe68..000000000 --- a/internal/encoding/parser.go +++ /dev/null @@ -1,90 +0,0 @@ -package encoding - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/encoding/graphite" -) - -type Parser struct { - graphiteParser *graphite.Parser -} - -func NewParser(parser *graphite.Parser) *Parser { - return &Parser{graphiteParser: parser} -} - -func (p *Parser) Parse(dataFormat string, out []byte, acc telegraf.Accumulator) error { - var err error - var metrics []telegraf.Metric - var metric telegraf.Metric - - switch dataFormat { - case "", "json": - var jsonOut interface{} - err = json.Unmarshal(out, &jsonOut) - if err != nil { - err = fmt.Errorf("unable to parse out as JSON, %s", err) - break - } - - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { - break - } - acc.AddFields("exec", f.Fields, nil) - case "influx": - now := time.Now() - metrics, err = telegraf.ParseMetrics(out) - for _, metric = range metrics { - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now) - } - case "graphite": - metrics, err = p.graphiteParser.ParseMetrics(out) - for _, metric = range metrics { - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) - } - default: - err = fmt.Errorf("Unsupported data format: %s. Must be either json, influx or graphite ", dataFormat) - } - - return err -} - -func (p *Parser) ParseSocketLines(dataFormat string, buf []byte) ([]telegraf.Metric, error) { - var err error - var metrics []telegraf.Metric - - switch dataFormat { - case "", "graphite": - metrics, err = p.graphiteParser.ParseMetrics(buf) - case "influx": - metrics, err = telegraf.ParseMetrics(buf) - default: - err = fmt.Errorf("Unsupported data format: %s. Must be either influx or graphite ", dataFormat) - } - - if err != nil { - return nil, err - } - return metrics, nil -} - -func (p *Parser) ParseSocketLine(dataFormat, line string) (telegraf.Metric, error) { - metrics, err := p.ParseSocketLines(dataFormat, []byte(line+"\n")) - - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf("Can not parse the line: %s, for data format: %s ", line, dataFormat) - } - - return metrics[0], nil -} diff --git a/plugins/inputs/exec/README.md b/plugins/inputs/exec/README.md index a58b3dde6..4b862b273 100644 --- a/plugins/inputs/exec/README.md +++ b/plugins/inputs/exec/README.md @@ -95,8 +95,11 @@ Now let's say we have the following configuration: ``` [[inputs.exec]] - # the command to run - command = "/usr/bin/line_protocol_collector" + # Shell/commands array + # compatible with old version + # we can still use the old command configuration + # command = "/usr/bin/line_protocol_collector" + commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"] # Data format to consume. This can be "json" or "influx" (line-protocol) # NOTE json only reads numerical measurements, strings and booleans are ignored. diff --git a/plugins/inputs/exec/config.go b/plugins/inputs/exec/config.go deleted file mode 100644 index c4ad5d47f..000000000 --- a/plugins/inputs/exec/config.go +++ /dev/null @@ -1,25 +0,0 @@ -package exec - -import ( - "github.com/influxdata/telegraf/internal/encoding/graphite" -) - -// Config represents the configuration for Graphite endpoints. -type Config struct { - Commands []string - graphite.Config -} - -// New Config instance. -func NewConfig(commands, templates []string, separator string) *Config { - c := &Config{} - if separator == "" { - separator = graphite.DefaultSeparator - } - - c.Commands = commands - c.Templates = templates - c.Separator = separator - - return c -} diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index efccc7033..a53a6f32d 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -10,8 +10,11 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/encoding" - "github.com/influxdata/telegraf/internal/encoding/graphite" "github.com/influxdata/telegraf/plugins/inputs" + + _ "github.com/influxdata/telegraf/internal/encoding/graphite" + _ "github.com/influxdata/telegraf/internal/encoding/influx" + _ "github.com/influxdata/telegraf/internal/encoding/json" ) const sampleConfig = ` @@ -57,9 +60,7 @@ type Exec struct { Separator string Templates []string - encodingParser *encoding.Parser - - config *Config + encodingParser encoding.Parser initedConfig bool @@ -107,8 +108,13 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { return } - if err = e.encodingParser.Parse(e.DataFormat, out, acc); err != nil { + metrics, err := e.encodingParser.Parse(out) + if err != nil { e.errc <- err + } else { + for _, metric := range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } } } @@ -120,23 +126,22 @@ func (e *Exec) initConfig() error { e.Commands = []string{e.Command} } - c := NewConfig(e.Commands, e.Templates, e.Separator) - if err := c.Validate(); err != nil { - return fmt.Errorf("exec configuration is error: %s ", err.Error()) - + if e.DataFormat == "" { + e.DataFormat = "json" } - e.config = c - graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{ - Templates: e.config.Templates, - Separator: e.config.Separator}) + var err error + + configs := make(map[string]interface{}) + configs["Separator"] = e.Separator + configs["Templates"] = e.Templates + + e.encodingParser, err = encoding.NewParser(e.DataFormat, configs) if err != nil { - return fmt.Errorf("exec input parser config is error: %s ", err.Error()) + return fmt.Errorf("exec configuration is error: %s ", err.Error()) } - e.encodingParser = encoding.NewParser(graphiteParser) - return nil } diff --git a/plugins/inputs/socket/config.go b/plugins/inputs/socket/config.go deleted file mode 100644 index 9115571da..000000000 --- a/plugins/inputs/socket/config.go +++ /dev/null @@ -1,58 +0,0 @@ -package socket - -import "github.com/influxdata/telegraf/internal/encoding/graphite" - -const ( - // DefaultBindAddress is the default binding interface if none is specified. - DefaultBindAddress = ":2003" - - // DefaultProtocol is the default IP protocol used by the Graphite input. - DefaultProtocol = "tcp" - - // DefaultUDPReadBuffer is the default buffer size for the UDP listener. - // Sets the size of the operating system's receive buffer associated with - // the UDP traffic. Keep in mind that the OS must be able - // to handle the number set here or the UDP listener will error and exit. - // - // DefaultReadBuffer = 0 means to use the OS default, which is usually too - // small for high UDP performance. - // - // Increasing OS buffer limits: - // Linux: sudo sysctl -w net.core.rmem_max= - // BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf= - DefaultUdpReadBuffer = 0 -) - -// Config represents the configuration for Graphite endpoints. -type Config struct { - BindAddress string - Protocol string - UdpReadBuffer int - - graphite.Config -} - -// New Config instance. -func NewConfig(bindAddress, protocol string, udpReadBuffer int, separator string, templates []string) *Config { - c := &Config{} - if bindAddress == "" { - bindAddress = DefaultBindAddress - } - if protocol == "" { - protocol = DefaultProtocol - } - if udpReadBuffer < 0 { - udpReadBuffer = DefaultUdpReadBuffer - } - if separator == "" { - separator = graphite.DefaultSeparator - } - - c.BindAddress = bindAddress - c.Protocol = protocol - c.UdpReadBuffer = udpReadBuffer - c.Separator = separator - c.Templates = templates - - return c -} diff --git a/plugins/inputs/socket/socket.go b/plugins/inputs/socket/socket.go index 353058a43..0174b2f53 100644 --- a/plugins/inputs/socket/socket.go +++ b/plugins/inputs/socket/socket.go @@ -16,9 +16,31 @@ import ( "github.com/influxdata/telegraf/internal/encoding" "github.com/influxdata/telegraf/internal/encoding/graphite" "github.com/influxdata/telegraf/plugins/inputs" + + _ "github.com/influxdata/telegraf/internal/encoding/graphite" + _ "github.com/influxdata/telegraf/internal/encoding/influx" ) const ( + // DefaultBindAddress is the default binding interface if none is specified. + DefaultBindAddress = ":2003" + + // DefaultProtocol is the default IP protocol used by the Graphite input. + DefaultProtocol = "tcp" + + // DefaultUDPReadBuffer is the default buffer size for the UDP listener. + // Sets the size of the operating system's receive buffer associated with + // the UDP traffic. Keep in mind that the OS must be able + // to handle the number set here or the UDP listener will error and exit. + // + // DefaultReadBuffer = 0 means to use the OS default, which is usually too + // small for high UDP performance. + // + // Increasing OS buffer limits: + // Linux: sudo sysctl -w net.core.rmem_max= + // BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf= + DefaultUdpReadBuffer = 0 + udpBufferSize = 65536 ) @@ -44,10 +66,9 @@ type Socket struct { mu sync.Mutex - encodingParser *encoding.Parser + encodingParser encoding.Parser logger *log.Logger - config *Config tcpConnectionsMu sync.Mutex tcpConnections map[string]*tcpConnection @@ -105,39 +126,43 @@ func (s *Socket) Start() error { s.mu.Lock() defer s.mu.Unlock() - c := NewConfig(s.BindAddress, s.Protocol, s.UdpReadBuffer, s.Separator, s.Templates) - - if err := c.Validate(); err != nil { - return fmt.Errorf("Socket input configuration is error: %s ", err.Error()) + if s.BindAddress == "" { + s.BindAddress = DefaultBindAddress + } + if s.Protocol == "" { + s.Protocol = DefaultProtocol + } + if s.UdpReadBuffer < 0 { + s.UdpReadBuffer = DefaultUdpReadBuffer } - s.config = c - graphiteParser, err := graphite.NewParserWithOptions(graphite.Options{ - Templates: s.config.Templates, - Separator: s.config.Separator}) + configs := make(map[string]interface{}) + configs["Separator"] = s.Separator + configs["Templates"] = s.Templates + + var err error + s.encodingParser, err = encoding.NewParser(s.DataFormat, configs) if err != nil { - return fmt.Errorf("Socket input parser config is error: %s ", err.Error()) + return fmt.Errorf("Socket input configuration is error: %s ", err.Error()) } - s.encodingParser = encoding.NewParser(graphiteParser) - s.tcpConnections = make(map[string]*tcpConnection) s.done = make(chan struct{}) s.metricC = make(chan telegraf.Metric, 50000) - if strings.ToLower(s.config.Protocol) == "tcp" { + if strings.ToLower(s.Protocol) == "tcp" { s.addr, err = s.openTCPServer() - } else if strings.ToLower(s.config.Protocol) == "udp" { + } else if strings.ToLower(s.Protocol) == "udp" { s.addr, err = s.openUDPServer() } else { - return fmt.Errorf("unrecognized Socket input protocol %s", s.config.Protocol) + return fmt.Errorf("unrecognized Socket input protocol %s", s.Protocol) } if err != nil { return err } - s.logger.Printf("Socket Plugin Listening on %s: %s", strings.ToUpper(s.config.Protocol), s.config.BindAddress) + s.logger.Printf("Socket Plugin Listening on %s: %s", strings.ToUpper(s.Protocol), s.BindAddress) return nil } @@ -170,7 +195,7 @@ func (s *Socket) Stop() { // openTCPServer opens the Socket input in TCP mode and starts processing data. func (s *Socket) openTCPServer() (net.Addr, error) { - ln, err := net.Listen("tcp", s.config.BindAddress) + ln, err := net.Listen("tcp", s.BindAddress) if err != nil { return nil, err } @@ -235,7 +260,7 @@ func (s *Socket) untrackConnection(c net.Conn) { // openUDPServer opens the Socket input in UDP mode and starts processing incoming data. func (s *Socket) openUDPServer() (net.Addr, error) { - addr, err := net.ResolveUDPAddr("udp", s.config.BindAddress) + addr, err := net.ResolveUDPAddr("udp", s.BindAddress) if err != nil { return nil, err } @@ -245,11 +270,11 @@ func (s *Socket) openUDPServer() (net.Addr, error) { return nil, err } - if s.config.UdpReadBuffer != 0 { - err = s.udpConn.SetReadBuffer(s.config.UdpReadBuffer) + if s.UdpReadBuffer != 0 { + err = s.udpConn.SetReadBuffer(s.UdpReadBuffer) if err != nil { return nil, fmt.Errorf("unable to set UDP read buffer to %d: %s", - s.config.UdpReadBuffer, err) + s.UdpReadBuffer, err) } } @@ -276,7 +301,7 @@ func (s *Socket) handleLines(buf []byte) { } // Parse it. - metrics, err := s.encodingParser.ParseSocketLines(s.DataFormat, buf) + metrics, err := s.encodingParser.Parse(buf) if err != nil { switch err := err.(type) { case *graphite.UnsupposedValueError: