Sflow rework (#7253)

This commit is contained in:
Steven Soroka
2020-04-29 17:28:55 -04:00
committed by GitHub
parent a0679c5427
commit 07c6b78c8f
16 changed files with 1254 additions and 2742 deletions

View File

@@ -13,7 +13,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/sflow/decoder"
)
const sampleConfig = `
@@ -38,11 +37,11 @@ type SFlow struct {
Log telegraf.Logger `toml:"-"`
addr net.Addr
decoderOpts decoder.Directive
closer io.Closer
cancel context.CancelFunc
wg sync.WaitGroup
addr net.Addr
decoder *PacketDecoder
closer io.Closer
cancel context.CancelFunc
wg sync.WaitGroup
}
// Description answers a description of this input plugin
@@ -56,14 +55,24 @@ func (s *SFlow) SampleConfig() string {
}
func (s *SFlow) Init() error {
config := NewDefaultV5FormatOptions()
s.decoderOpts = V5Format(config)
s.decoder = NewDecoder()
s.decoder.Log = s.Log
return nil
}
// Start starts this sFlow listener listening on the configured network for sFlow packets
func (s *SFlow) Start(acc telegraf.Accumulator) error {
s.decoder.OnPacket(func(p *V5Format) {
metrics, err := makeMetrics(p)
if err != nil {
s.Log.Errorf("Failed to make metric from packet: %s", err)
return
}
for _, m := range metrics {
acc.AddMetric(m)
}
})
u, err := url.Parse(s.ServiceAddress)
if err != nil {
return err
@@ -122,14 +131,9 @@ func (s *SFlow) read(acc telegraf.Accumulator, conn net.PacketConn) {
}
func (s *SFlow) process(acc telegraf.Accumulator, buf []byte) {
decoder := decoder.NewDecodeContext()
if err := decoder.Decode(s.decoderOpts, bytes.NewBuffer(buf)); err != nil {
acc.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
}
metrics := decoder.GetMetrics()
for _, m := range metrics {
acc.AddMetric(m)
if err := s.decoder.Decode(bytes.NewBuffer(buf)); err != nil {
acc.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
}
}