Add call to optional Init function for all plugins (#5899)

This commit is contained in:
Daniel Nelson 2019-06-14 15:12:27 -07:00 committed by GitHub
parent b35beb2fba
commit 0ff9c8ef88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 131 additions and 39 deletions

View File

@ -39,8 +39,14 @@ func (a *Agent) Run(ctx context.Context) error {
return ctx.Err() return ctx.Err()
} }
log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
return err
}
log.Printf("D! [agent] Connecting outputs") log.Printf("D! [agent] Connecting outputs")
err := a.connectOutputs(ctx) err = a.connectOutputs(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -185,6 +191,11 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
} }
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
err := input.Init()
if err != nil {
return err
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
@ -596,6 +607,39 @@ func (a *Agent) flushOnce(
} }
// initPlugins runs the Init function on plugins.
func (a *Agent) initPlugins() error {
for _, input := range a.Config.Inputs {
err := input.Init()
if err != nil {
return fmt.Errorf("could not initialize input %s: %v",
input.Config.Name, err)
}
}
for _, processor := range a.Config.Processors {
err := processor.Init()
if err != nil {
return fmt.Errorf("could not initialize processor %s: %v",
processor.Config.Name, err)
}
}
for _, aggregator := range a.Config.Aggregators {
err := aggregator.Init()
if err != nil {
return fmt.Errorf("could not initialize aggregator %s: %v",
aggregator.Config.Name, err)
}
}
for _, output := range a.Config.Outputs {
err := output.Init()
if err != nil {
return fmt.Errorf("could not initialize output %s: %v",
output.Config.Name, err)
}
}
return nil
}
// connectOutputs connects to all outputs. // connectOutputs connects to all outputs.
func (a *Agent) connectOutputs(ctx context.Context) error { func (a *Agent) connectOutputs(ctx context.Context) error {
for _, output := range a.Config.Outputs { for _, output := range a.Config.Outputs {

View File

@ -52,6 +52,10 @@ var sampleConfig = `
drop_original = false drop_original = false
` `
func (m *Min) Init() error {
return nil
}
func (m *Min) SampleConfig() string { func (m *Min) SampleConfig() string {
return sampleConfig return sampleConfig
} }

View File

@ -52,6 +52,10 @@ func (s *Simple) SampleConfig() string {
` `
} }
func (s *Simple) Init() error {
return nil
}
func (s *Simple) Gather(acc telegraf.Accumulator) error { func (s *Simple) Gather(acc telegraf.Accumulator) error {
if s.Ok { if s.Ok {
acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil) acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)

View File

@ -43,6 +43,10 @@ func (s *Simple) SampleConfig() string {
` `
} }
func (s *Simple) Init() error {
return nil
}
func (s *Simple) Connect() error { func (s *Simple) Connect() error {
// Make a connection to the URL here // Make a connection to the URL here
return nil return nil

View File

@ -46,6 +46,10 @@ func (p *Printer) Description() string {
return "Print all metrics that pass through this filter." return "Print all metrics that pass through this filter."
} }
func (p *Printer) Init() error {
return nil
}
func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in { for _, metric := range in {
fmt.Println(metric.String()) fmt.Println(metric.String())

View File

@ -1,5 +1,14 @@
package telegraf package telegraf
// Initializer is an interface that all plugin types: Inputs, Outputs,
// Processors, and Aggregators can optionally implement to initialize the
// plugin.
type Initializer interface {
// Init performs one time setup of the plugin and returns an error if the
// configuration is invalid.
Init() error
}
type Input interface { type Input interface {
// SampleConfig returns the default configuration of the Input // SampleConfig returns the default configuration of the Input
SampleConfig() string SampleConfig() string

View File

@ -71,6 +71,16 @@ func (r *RunningAggregator) Name() string {
return "aggregators." + r.Config.Name return "aggregators." + r.Config.Name
} }
func (r *RunningAggregator) Init() error {
if p, ok := r.Aggregator.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (r *RunningAggregator) Period() time.Duration { func (r *RunningAggregator) Period() time.Duration {
return r.Config.Period return r.Config.Period
} }

View File

@ -56,6 +56,16 @@ func (r *RunningInput) metricFiltered(metric telegraf.Metric) {
metric.Drop() metric.Drop()
} }
func (r *RunningInput) Init() error {
if p, ok := r.Input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric { func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
if ok := r.Config.Filter.Select(metric); !ok { if ok := r.Config.Filter.Select(metric); !ok {
r.metricFiltered(metric) r.metricFiltered(metric)

View File

@ -97,6 +97,16 @@ func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) {
metric.Drop() metric.Drop()
} }
func (ro *RunningOutput) Init() error {
if p, ok := ro.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
// AddMetric adds a metric to the output. // AddMetric adds a metric to the output.
// //
// Takes ownership of metric // Takes ownership of metric

View File

@ -40,6 +40,16 @@ func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool {
return false return false
} }
func (rp *RunningProcessor) Init() error {
if p, ok := rp.Processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
rp.Lock() rp.Lock()
defer rp.Unlock() defer rp.Unlock()

View File

@ -1,7 +1,6 @@
package http package http
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -89,18 +88,12 @@ func (*HTTP) Description() string {
return "Read formatted metrics from one or more HTTP endpoints" return "Read formatted metrics from one or more HTTP endpoints"
} }
// Gather takes in an accumulator and adds the metrics that the Input func (h *HTTP) Init() error {
// gathers. This is called every "interval"
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
if h.parser == nil {
return errors.New("Parser is not set")
}
if h.client == nil {
tlsCfg, err := h.ClientConfig.TLSConfig() tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil { if err != nil {
return err return err
} }
h.client = &http.Client{ h.client = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
TLSClientConfig: tlsCfg, TLSClientConfig: tlsCfg,
@ -108,8 +101,12 @@ func (h *HTTP) Gather(acc telegraf.Accumulator) error {
}, },
Timeout: h.Timeout.Duration, Timeout: h.Timeout.Duration,
} }
} return nil
}
// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, u := range h.URLs { for _, u := range h.URLs {
wg.Add(1) wg.Add(1)

View File

@ -37,6 +37,7 @@ func TestHTTPwithJSONFormat(t *testing.T) {
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather)) require.NoError(t, acc.GatherError(plugin.Gather))
require.Len(t, acc.Metrics, 1) require.Len(t, acc.Metrics, 1)
@ -78,6 +79,7 @@ func TestHTTPHeaders(t *testing.T) {
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather)) require.NoError(t, acc.GatherError(plugin.Gather))
} }
@ -100,6 +102,7 @@ func TestInvalidStatusCode(t *testing.T) {
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
plugin.Init()
require.Error(t, acc.GatherError(plugin.Gather)) require.Error(t, acc.GatherError(plugin.Gather))
} }
@ -125,28 +128,10 @@ func TestMethod(t *testing.T) {
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather)) require.NoError(t, acc.GatherError(plugin.Gather))
} }
func TestParserNotSet(t *testing.T) {
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, _ = w.Write([]byte(simpleJSON))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()
url := fakeServer.URL + "/endpoint"
plugin := &plugin.HTTP{
URLs: []string{url},
}
var acc testutil.Accumulator
require.Error(t, acc.GatherError(plugin.Gather))
}
const simpleJSON = ` const simpleJSON = `
{ {
"a": 1.2 "a": 1.2
@ -237,6 +222,7 @@ func TestBodyAndContentEncoding(t *testing.T) {
tt.plugin.SetParser(parser) tt.plugin.SetParser(parser)
var acc testutil.Accumulator var acc testutil.Accumulator
tt.plugin.Init()
err = tt.plugin.Gather(&acc) err = tt.plugin.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
}) })