add support for streaming processors (#7634)

This commit is contained in:
Steven Soroka 2020-06-05 10:43:43 -04:00 committed by GitHub
parent b99e3bc63d
commit 741ea839d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 913 additions and 579 deletions

File diff suppressed because it is too large Load Diff

View File

@ -5,11 +5,7 @@ package telegraf
// Add, Push, and Reset can not be called concurrently, so locking is not // Add, Push, and Reset can not be called concurrently, so locking is not
// required when implementing an Aggregator plugin. // required when implementing an Aggregator plugin.
type Aggregator interface { type Aggregator interface {
// SampleConfig returns the default configuration of the Input. PluginDescriber
SampleConfig() string
// Description returns a one-sentence description on the Input.
Description() string
// Add the metric to the aggregator. // Add the metric to the aggregator.
Add(in Metric) Add(in Metric)

View File

@ -66,6 +66,7 @@ type Config struct {
Aggregators []*models.RunningAggregator Aggregators []*models.RunningAggregator
// Processors have a slice wrapper type because they need to be sorted // Processors have a slice wrapper type because they need to be sorted
Processors models.RunningProcessors Processors models.RunningProcessors
AggProcessors models.RunningProcessors
} }
func NewConfig() *Config { func NewConfig() *Config {
@ -83,6 +84,7 @@ func NewConfig() *Config {
Inputs: make([]*models.RunningInput, 0), Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0), Outputs: make([]*models.RunningOutput, 0),
Processors: make([]*models.RunningProcessor, 0), Processors: make([]*models.RunningProcessor, 0),
AggProcessors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0), InputFilters: make([]string, 0),
OutputFilters: make([]string, 0), OutputFilters: make([]string, 0),
} }
@ -561,12 +563,7 @@ func printFilteredGlobalSections(sectionFilters []string) {
} }
} }
type printer interface { func printConfig(name string, p telegraf.PluginDescriber, op string, commented bool) {
Description() string
SampleConfig() string
}
func printConfig(name string, p printer, op string, commented bool) {
comment := "" comment := ""
if commented { if commented {
comment = "# " comment = "# "
@ -684,12 +681,20 @@ func (c *Config) LoadConfig(path string) error {
} }
data, err := loadConfig(path) data, err := loadConfig(path)
if err != nil { if err != nil {
return fmt.Errorf("Error loading %s, %s", path, err) return fmt.Errorf("Error loading config file %s: %w", path, err)
} }
if err = c.LoadConfigData(data); err != nil {
return fmt.Errorf("Error loading config file %s: %w", path, err)
}
return nil
}
// LoadConfigData loads TOML-formatted config data
func (c *Config) LoadConfigData(data []byte) error {
tbl, err := parseConfig(data) tbl, err := parseConfig(data)
if err != nil { if err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err) return fmt.Errorf("Error parsing data: %s", err)
} }
// Parse tags tables first: // Parse tags tables first:
@ -697,11 +702,10 @@ func (c *Config) LoadConfig(path string) error {
if val, ok := tbl.Fields[tableName]; ok { if val, ok := tbl.Fields[tableName]; ok {
subTable, ok := val.(*ast.Table) subTable, ok := val.(*ast.Table)
if !ok { if !ok {
return fmt.Errorf("%s: invalid configuration", path) return fmt.Errorf("invalid configuration, bad table name %q", tableName)
} }
if err = toml.UnmarshalTable(subTable, c.Tags); err != nil { if err = toml.UnmarshalTable(subTable, c.Tags); err != nil {
log.Printf("E! Could not parse [global_tags] config\n") return fmt.Errorf("error parsing table name %q: %w", tableName, err)
return fmt.Errorf("Error parsing %s, %s", path, err)
} }
} }
} }
@ -710,11 +714,10 @@ func (c *Config) LoadConfig(path string) error {
if val, ok := tbl.Fields["agent"]; ok { if val, ok := tbl.Fields["agent"]; ok {
subTable, ok := val.(*ast.Table) subTable, ok := val.(*ast.Table)
if !ok { if !ok {
return fmt.Errorf("%s: invalid configuration", path) return fmt.Errorf("invalid configuration, error parsing agent table")
} }
if err = toml.UnmarshalTable(subTable, c.Agent); err != nil { if err = toml.UnmarshalTable(subTable, c.Agent); err != nil {
log.Printf("E! Could not parse [agent] config\n") return fmt.Errorf("error parsing agent table: %w", err)
return fmt.Errorf("Error parsing %s, %s", path, err)
} }
} }
@ -735,7 +738,7 @@ func (c *Config) LoadConfig(path string) error {
for name, val := range tbl.Fields { for name, val := range tbl.Fields {
subTable, ok := val.(*ast.Table) subTable, ok := val.(*ast.Table)
if !ok { if !ok {
return fmt.Errorf("%s: invalid configuration", path) return fmt.Errorf("invalid configuration, error parsing field %q as table", name)
} }
switch name { switch name {
@ -746,17 +749,17 @@ func (c *Config) LoadConfig(path string) error {
// legacy [outputs.influxdb] support // legacy [outputs.influxdb] support
case *ast.Table: case *ast.Table:
if err = c.addOutput(pluginName, pluginSubTable); err != nil { if err = c.addOutput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err) return fmt.Errorf("Error parsing %s, %s", pluginName, err)
} }
case []*ast.Table: case []*ast.Table:
for _, t := range pluginSubTable { for _, t := range pluginSubTable {
if err = c.addOutput(pluginName, t); err != nil { if err = c.addOutput(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err) return fmt.Errorf("Error parsing %s array, %s", pluginName, err)
} }
} }
default: default:
return fmt.Errorf("Unsupported config format: %s, file %s", return fmt.Errorf("Unsupported config format: %s",
pluginName, path) pluginName)
} }
} }
case "inputs", "plugins": case "inputs", "plugins":
@ -765,17 +768,17 @@ func (c *Config) LoadConfig(path string) error {
// legacy [inputs.cpu] support // legacy [inputs.cpu] support
case *ast.Table: case *ast.Table:
if err = c.addInput(pluginName, pluginSubTable); err != nil { if err = c.addInput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err) return fmt.Errorf("Error parsing %s, %s", pluginName, err)
} }
case []*ast.Table: case []*ast.Table:
for _, t := range pluginSubTable { for _, t := range pluginSubTable {
if err = c.addInput(pluginName, t); err != nil { if err = c.addInput(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err) return fmt.Errorf("Error parsing %s, %s", pluginName, err)
} }
} }
default: default:
return fmt.Errorf("Unsupported config format: %s, file %s", return fmt.Errorf("Unsupported config format: %s",
pluginName, path) pluginName)
} }
} }
case "processors": case "processors":
@ -784,12 +787,12 @@ func (c *Config) LoadConfig(path string) error {
case []*ast.Table: case []*ast.Table:
for _, t := range pluginSubTable { for _, t := range pluginSubTable {
if err = c.addProcessor(pluginName, t); err != nil { if err = c.addProcessor(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err) return fmt.Errorf("Error parsing %s, %s", pluginName, err)
} }
} }
default: default:
return fmt.Errorf("Unsupported config format: %s, file %s", return fmt.Errorf("Unsupported config format: %s",
pluginName, path) pluginName)
} }
} }
case "aggregators": case "aggregators":
@ -798,19 +801,19 @@ func (c *Config) LoadConfig(path string) error {
case []*ast.Table: case []*ast.Table:
for _, t := range pluginSubTable { for _, t := range pluginSubTable {
if err = c.addAggregator(pluginName, t); err != nil { if err = c.addAggregator(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err) return fmt.Errorf("Error parsing %s, %s", pluginName, err)
} }
} }
default: default:
return fmt.Errorf("Unsupported config format: %s, file %s", return fmt.Errorf("Unsupported config format: %s",
pluginName, path) pluginName)
} }
} }
// Assume it's an input input for legacy config file support if no other // Assume it's an input input for legacy config file support if no other
// identifiers are present // identifiers are present
default: default:
if err = c.addInput(name, subTable); err != nil { if err = c.addInput(name, subTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err) return fmt.Errorf("Error parsing %s, %s", name, err)
} }
} }
} }
@ -929,21 +932,48 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
if !ok { if !ok {
return fmt.Errorf("Undefined but requested processor: %s", name) return fmt.Errorf("Undefined but requested processor: %s", name)
} }
processor := creator()
processorConfig, err := buildProcessor(name, table) processorConfig, err := buildProcessor(name, table)
if err != nil { if err != nil {
return err return err
} }
if err := toml.UnmarshalTable(table, processor); err != nil { rf, err := c.newRunningProcessor(creator, processorConfig, name, table)
if err != nil {
return err return err
} }
c.Processors = append(c.Processors, rf)
// save a copy for the aggregator
rf, err = c.newRunningProcessor(creator, processorConfig, name, table)
if err != nil {
return err
}
c.AggProcessors = append(c.AggProcessors, rf)
return nil
}
func (c *Config) newRunningProcessor(
creator processors.StreamingCreator,
processorConfig *models.ProcessorConfig,
name string,
table *ast.Table,
) (*models.RunningProcessor, error) {
processor := creator()
if p, ok := processor.(unwrappable); ok {
if err := toml.UnmarshalTable(table, p.Unwrap()); err != nil {
return nil, err
}
} else {
if err := toml.UnmarshalTable(table, processor); err != nil {
return nil, err
}
}
rf := models.NewRunningProcessor(processor, processorConfig) rf := models.NewRunningProcessor(processor, processorConfig)
return rf, nil
c.Processors = append(c.Processors, rf)
return nil
} }
func (c *Config) addOutput(name string, table *ast.Table) error { func (c *Config) addOutput(name string, table *ast.Table) error {
@ -2195,3 +2225,10 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
return oc, nil return oc, nil
} }
// unwrappable lets you retrieve the original telegraf.Processor from the
// StreamingProcessor. This is necessary because the toml Unmarshaller won't
// look inside composed types.
type unwrappable interface {
Unwrap() telegraf.Processor
}

View File

@ -207,7 +207,7 @@ func TestConfig_FieldNotDefined(t *testing.T) {
c := NewConfig() c := NewConfig()
err := c.LoadConfig("./testdata/invalid_field.toml") err := c.LoadConfig("./testdata/invalid_field.toml")
require.Error(t, err, "invalid field name") require.Error(t, err, "invalid field name")
assert.Equal(t, "Error parsing ./testdata/invalid_field.toml, line 2: field corresponding to `not_a_field' is not defined in http_listener_v2.HTTPListenerV2", err.Error()) assert.Equal(t, "Error loading config file ./testdata/invalid_field.toml: Error parsing http_listener_v2, line 2: field corresponding to `not_a_field' is not defined in http_listener_v2.HTTPListenerV2", err.Error())
} }
@ -215,12 +215,12 @@ func TestConfig_WrongFieldType(t *testing.T) {
c := NewConfig() c := NewConfig()
err := c.LoadConfig("./testdata/wrong_field_type.toml") err := c.LoadConfig("./testdata/wrong_field_type.toml")
require.Error(t, err, "invalid field type") require.Error(t, err, "invalid field type")
assert.Equal(t, "Error parsing ./testdata/wrong_field_type.toml, line 2: (http_listener_v2.HTTPListenerV2.Port) cannot unmarshal TOML string into int", err.Error()) assert.Equal(t, "Error loading config file ./testdata/wrong_field_type.toml: Error parsing http_listener_v2, line 2: (http_listener_v2.HTTPListenerV2.Port) cannot unmarshal TOML string into int", err.Error())
c = NewConfig() c = NewConfig()
err = c.LoadConfig("./testdata/wrong_field_type2.toml") err = c.LoadConfig("./testdata/wrong_field_type2.toml")
require.Error(t, err, "invalid field type2") require.Error(t, err, "invalid field type2")
assert.Equal(t, "Error parsing ./testdata/wrong_field_type2.toml, line 2: (http_listener_v2.HTTPListenerV2.Methods) cannot unmarshal TOML string into []string", err.Error()) assert.Equal(t, "Error loading config file ./testdata/wrong_field_type2.toml: Error parsing http_listener_v2, line 2: (http_listener_v2.HTTPListenerV2.Methods) cannot unmarshal TOML string into []string", err.Error())
} }
func TestConfig_InlineTables(t *testing.T) { func TestConfig_InlineTables(t *testing.T) {
@ -255,5 +255,5 @@ func TestConfig_BadOrdering(t *testing.T) {
c := NewConfig() c := NewConfig()
err := c.LoadConfig("./testdata/non_slice_slice.toml") err := c.LoadConfig("./testdata/non_slice_slice.toml")
require.Error(t, err, "bad ordering") require.Error(t, err, "bad ordering")
assert.Equal(t, "Error parsing ./testdata/non_slice_slice.toml, line 4: cannot unmarshal TOML array into string (need slice)", err.Error()) assert.Equal(t, "Error loading config file ./testdata/non_slice_slice.toml: Error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)", err.Error())
} }

View File

@ -1,11 +1,7 @@
package telegraf package telegraf
type Input interface { type Input interface {
// SampleConfig returns the default configuration of the Input PluginDescriber
SampleConfig() string
// Description returns a one-sentence description on the Input
Description() string
// Gather takes in an accumulator and adds the metrics that the Input // Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval" // gathers. This is called every "interval"

View File

@ -10,7 +10,7 @@ import (
type RunningProcessor struct { type RunningProcessor struct {
sync.Mutex sync.Mutex
log telegraf.Logger log telegraf.Logger
Processor telegraf.Processor Processor telegraf.StreamingProcessor
Config *ProcessorConfig Config *ProcessorConfig
} }
@ -28,7 +28,7 @@ type ProcessorConfig struct {
Filter Filter Filter Filter
} }
func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) *RunningProcessor { func NewRunningProcessor(processor telegraf.StreamingProcessor, config *ProcessorConfig) *RunningProcessor {
tags := map[string]string{"processor": config.Name} tags := map[string]string{"processor": config.Name}
if config.Alias != "" { if config.Alias != "" {
tags["alias"] = config.Alias tags["alias"] = config.Alias
@ -52,15 +52,6 @@ func (rp *RunningProcessor) metricFiltered(metric telegraf.Metric) {
metric.Drop() metric.Drop()
} }
func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool {
for _, m := range metrics {
if item == m {
return true
}
}
return false
}
func (r *RunningProcessor) Init() error { func (r *RunningProcessor) Init() error {
if p, ok := r.Processor.(telegraf.Initializer); ok { if p, ok := r.Processor.(telegraf.Initializer); ok {
err := p.Init() err := p.Init()
@ -71,34 +62,39 @@ func (r *RunningProcessor) Init() error {
return nil return nil
} }
func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
rp.Lock()
defer rp.Unlock()
ret := []telegraf.Metric{}
for _, metric := range in {
// In processors when a filter selects a metric it is sent through the
// processor. Otherwise the metric continues downstream unmodified.
if ok := rp.Config.Filter.Select(metric); !ok {
ret = append(ret, metric)
continue
}
rp.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
rp.metricFiltered(metric)
continue
}
// This metric should pass through the filter, so call the filter Apply
// function and append results to the output slice.
ret = append(ret, rp.Processor.Apply(metric)...)
}
return ret
}
func (r *RunningProcessor) Log() telegraf.Logger { func (r *RunningProcessor) Log() telegraf.Logger {
return r.log return r.log
} }
func (r *RunningProcessor) LogName() string {
return logName("processors", r.Config.Name, r.Config.Alias)
}
func (r *RunningProcessor) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}
func (r *RunningProcessor) Start(acc telegraf.Accumulator) error {
return r.Processor.Start(acc)
}
func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) {
if ok := r.Config.Filter.Select(m); !ok {
// pass downstream
acc.AddMetric(m)
return
}
r.Config.Filter.Modify(m)
if len(m.FieldList()) == 0 {
// drop metric
r.metricFiltered(m)
return
}
r.Processor.Add(m, acc)
}
func (r *RunningProcessor) Stop() {
r.Processor.Stop()
}

View File

@ -6,8 +6,8 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -43,7 +43,7 @@ func TagProcessor(key, value string) *MockProcessor {
func TestRunningProcessor_Apply(t *testing.T) { func TestRunningProcessor_Apply(t *testing.T) {
type args struct { type args struct {
Processor telegraf.Processor Processor telegraf.StreamingProcessor
Config *ProcessorConfig Config *ProcessorConfig
} }
@ -56,7 +56,7 @@ func TestRunningProcessor_Apply(t *testing.T) {
{ {
name: "inactive filter applies metrics", name: "inactive filter applies metrics",
args: args{ args: args{
Processor: TagProcessor("apply", "true"), Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{ Config: &ProcessorConfig{
Filter: Filter{}, Filter: Filter{},
}, },
@ -87,7 +87,7 @@ func TestRunningProcessor_Apply(t *testing.T) {
{ {
name: "filter applies", name: "filter applies",
args: args{ args: args{
Processor: TagProcessor("apply", "true"), Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{ Config: &ProcessorConfig{
Filter: Filter{ Filter: Filter{
NamePass: []string{"cpu"}, NamePass: []string{"cpu"},
@ -120,7 +120,7 @@ func TestRunningProcessor_Apply(t *testing.T) {
{ {
name: "filter doesn't apply", name: "filter doesn't apply",
args: args{ args: args{
Processor: TagProcessor("apply", "true"), Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{ Config: &ProcessorConfig{
Filter: Filter{ Filter: Filter{
NameDrop: []string{"cpu"}, NameDrop: []string{"cpu"},
@ -158,7 +158,15 @@ func TestRunningProcessor_Apply(t *testing.T) {
} }
rp.Config.Filter.Compile() rp.Config.Filter.Compile()
actual := rp.Apply(tt.input...) acc := testutil.Accumulator{}
err := rp.Start(&acc)
require.NoError(t, err)
for _, m := range tt.input {
rp.Add(m, &acc)
}
rp.Stop()
actual := acc.GetTelegrafMetrics()
require.Equal(t, tt.expected, actual) require.Equal(t, tt.expected, actual)
}) })
} }

View File

@ -1,14 +1,12 @@
package telegraf package telegraf
type Output interface { type Output interface {
PluginDescriber
// Connect to the Output // Connect to the Output
Connect() error Connect() error
// Close any connections to the Output // Close any connections to the Output
Close() error Close() error
// Description returns a one-sentence description on the Output
Description() string
// SampleConfig returns the default configuration of the Output
SampleConfig() string
// Write takes in group of points to be written to the Output // Write takes in group of points to be written to the Output
Write(metrics []Metric) error Write(metrics []Metric) error
} }

View File

@ -9,6 +9,16 @@ type Initializer interface {
Init() error Init() error
} }
// PluginDescriber contains the functions all plugins must implement to describe
// themselves to Telegraf
type PluginDescriber interface {
// SampleConfig returns the default configuration of the Processor
SampleConfig() string
// Description returns a one-sentence description on the Processor
Description() string
}
// Logger defines an interface for logging. // Logger defines an interface for logging.
type Logger interface { type Logger interface {
// Errorf logs an error message, patterned after log.Printf. // Errorf logs an error message, patterned after log.Printf.

View File

@ -3,9 +3,24 @@ package processors
import "github.com/influxdata/telegraf" import "github.com/influxdata/telegraf"
type Creator func() telegraf.Processor type Creator func() telegraf.Processor
type StreamingCreator func() telegraf.StreamingProcessor
var Processors = map[string]Creator{} // all processors are streaming processors.
// telegraf.Processor processors are upgraded to telegraf.StreamingProcessor
var Processors = map[string]StreamingCreator{}
// Add adds a telegraf.Processor processor
func Add(name string, creator Creator) { func Add(name string, creator Creator) {
Processors[name] = upgradeToStreamingProcessor(creator)
}
// AddStreaming adds a telegraf.StreamingProcessor streaming processor
func AddStreaming(name string, creator StreamingCreator) {
Processors[name] = creator Processors[name] = creator
} }
func upgradeToStreamingProcessor(oldCreator Creator) StreamingCreator {
return func() telegraf.StreamingProcessor {
return NewStreamingProcessorFromProcessor(oldCreator())
}
}

View File

@ -0,0 +1,49 @@
package processors
import (
"github.com/influxdata/telegraf"
)
// NewStreamingProcessorFromProcessor is a converter that turns a standard
// processor into a streaming processor
func NewStreamingProcessorFromProcessor(p telegraf.Processor) telegraf.StreamingProcessor {
sp := &streamingProcessor{
processor: p,
}
return sp
}
type streamingProcessor struct {
processor telegraf.Processor
acc telegraf.Accumulator
}
func (sp *streamingProcessor) SampleConfig() string {
return sp.processor.SampleConfig()
}
func (sp *streamingProcessor) Description() string {
return sp.processor.Description()
}
func (sp *streamingProcessor) Start(acc telegraf.Accumulator) error {
sp.acc = acc
return nil
}
func (sp *streamingProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) {
for _, m := range sp.processor.Apply(m) {
acc.AddMetric(m)
}
}
func (sp *streamingProcessor) Stop() error {
return nil
}
// Unwrap lets you retrieve the original telegraf.Processor from the
// StreamingProcessor. This is necessary because the toml Unmarshaller won't
// look inside composed types.
func (sp *streamingProcessor) Unwrap() telegraf.Processor {
return sp.processor
}

View File

@ -1,12 +1,31 @@
package telegraf package telegraf
// Processor is a processor plugin interface for defining new inline processors.
// these are extremely efficient and should be used over StreamingProcessor if
// you do not need asynchronous metric writes.
type Processor interface { type Processor interface {
// SampleConfig returns the default configuration of the Input PluginDescriber
SampleConfig() string
// Description returns a one-sentence description on the Input
Description() string
// Apply the filter to the given metric. // Apply the filter to the given metric.
Apply(in ...Metric) []Metric Apply(in ...Metric) []Metric
} }
// StreamingProcessor is a processor that can take in a stream of messages
type StreamingProcessor interface {
PluginDescriber
// Start is the initializer for the processor
// Start is only called once per plugin instance, and never in parallel.
// Start should exit immediately after setup
Start(acc Accumulator) error
// Add is called for each metric to be processed.
Add(metric Metric, acc Accumulator)
// Stop gives you a callback to free resources.
// by the time Stop is called, the input stream will have already been closed
// and Add will not be called anymore.
// When stop returns, you should no longer be writing metrics to the
// accumulator.
Stop() error
}