diff --git a/agent/agent.go b/agent/agent.go index e6e982c02..542154388 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -39,8 +39,14 @@ func (a *Agent) Run(ctx context.Context) error { return ctx.Err() } + log.Printf("D! [agent] Initializing plugins") + err := a.initPlugins() + if err != nil { + return err + } + log.Printf("D! [agent] Connecting outputs") - err := a.connectOutputs(ctx) + err = a.connectOutputs(ctx) if err != nil { return err } @@ -185,6 +191,11 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { } for _, input := range a.Config.Inputs { + err := input.Init() + if err != nil { + return err + } + select { case <-ctx.Done(): return nil @@ -596,6 +607,39 @@ func (a *Agent) flushOnce( } +// initPlugins runs the Init function on plugins. +func (a *Agent) initPlugins() error { + for _, input := range a.Config.Inputs { + err := input.Init() + if err != nil { + return fmt.Errorf("could not initialize input %s: %v", + input.Config.Name, err) + } + } + for _, processor := range a.Config.Processors { + err := processor.Init() + if err != nil { + return fmt.Errorf("could not initialize processor %s: %v", + processor.Config.Name, err) + } + } + for _, aggregator := range a.Config.Aggregators { + err := aggregator.Init() + if err != nil { + return fmt.Errorf("could not initialize aggregator %s: %v", + aggregator.Config.Name, err) + } + } + for _, output := range a.Config.Outputs { + err := output.Init() + if err != nil { + return fmt.Errorf("could not initialize output %s: %v", + output.Config.Name, err) + } + } + return nil +} + // connectOutputs connects to all outputs. func (a *Agent) connectOutputs(ctx context.Context) error { for _, output := range a.Config.Outputs { diff --git a/docs/AGGREGATORS.md b/docs/AGGREGATORS.md index eee5b1de5..a5930a3e0 100644 --- a/docs/AGGREGATORS.md +++ b/docs/AGGREGATORS.md @@ -52,6 +52,10 @@ var sampleConfig = ` drop_original = false ` +func (m *Min) Init() error { + return nil +} + func (m *Min) SampleConfig() string { return sampleConfig } diff --git a/docs/INPUTS.md b/docs/INPUTS.md index 2f4cce3b6..f8e906f31 100644 --- a/docs/INPUTS.md +++ b/docs/INPUTS.md @@ -52,6 +52,10 @@ func (s *Simple) SampleConfig() string { ` } +func (s *Simple) Init() error { + return nil +} + func (s *Simple) Gather(acc telegraf.Accumulator) error { if s.Ok { acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil) diff --git a/docs/OUTPUTS.md b/docs/OUTPUTS.md index 8bba4687e..9d89491cc 100644 --- a/docs/OUTPUTS.md +++ b/docs/OUTPUTS.md @@ -43,6 +43,10 @@ func (s *Simple) SampleConfig() string { ` } +func (s *Simple) Init() error { + return nil +} + func (s *Simple) Connect() error { // Make a connection to the URL here return nil diff --git a/docs/PROCESSORS.md b/docs/PROCESSORS.md index 4f18b2d55..6ea82fdae 100644 --- a/docs/PROCESSORS.md +++ b/docs/PROCESSORS.md @@ -46,6 +46,10 @@ func (p *Printer) Description() string { return "Print all metrics that pass through this filter." } +func (p *Printer) Init() error { + return nil +} + func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { for _, metric := range in { fmt.Println(metric.String()) diff --git a/input.go b/input.go index 071ab7d9d..ee47bc347 100644 --- a/input.go +++ b/input.go @@ -1,5 +1,14 @@ package telegraf +// Initializer is an interface that all plugin types: Inputs, Outputs, +// Processors, and Aggregators can optionally implement to initialize the +// plugin. +type Initializer interface { + // Init performs one time setup of the plugin and returns an error if the + // configuration is invalid. + Init() error +} + type Input interface { // SampleConfig returns the default configuration of the Input SampleConfig() string diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 8a2cd576a..8bd983eef 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -71,6 +71,16 @@ func (r *RunningAggregator) Name() string { return "aggregators." + r.Config.Name } +func (r *RunningAggregator) Init() error { + if p, ok := r.Aggregator.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return err + } + } + return nil +} + func (r *RunningAggregator) Period() time.Duration { return r.Config.Period } diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 08a804c40..73c14fc0f 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -56,6 +56,16 @@ func (r *RunningInput) metricFiltered(metric telegraf.Metric) { metric.Drop() } +func (r *RunningInput) Init() error { + if p, ok := r.Input.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return err + } + } + return nil +} + func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric { if ok := r.Config.Filter.Select(metric); !ok { r.metricFiltered(metric) diff --git a/internal/models/running_output.go b/internal/models/running_output.go index ff2b88e2a..438ecd480 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -97,6 +97,16 @@ func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) { metric.Drop() } +func (ro *RunningOutput) Init() error { + if p, ok := ro.Output.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return err + } + } + return nil +} + // AddMetric adds a metric to the output. // // Takes ownership of metric diff --git a/internal/models/running_processor.go b/internal/models/running_processor.go index 38369d03b..90d32fde5 100644 --- a/internal/models/running_processor.go +++ b/internal/models/running_processor.go @@ -40,6 +40,16 @@ func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool { return false } +func (rp *RunningProcessor) Init() error { + if p, ok := rp.Processor.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return err + } + } + return nil +} + func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { rp.Lock() defer rp.Unlock() diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go index 6d2d528ba..34db9d287 100644 --- a/plugins/inputs/http/http.go +++ b/plugins/inputs/http/http.go @@ -1,7 +1,6 @@ package http import ( - "errors" "fmt" "io" "io/ioutil" @@ -89,27 +88,25 @@ func (*HTTP) Description() string { return "Read formatted metrics from one or more HTTP endpoints" } +func (h *HTTP) Init() error { + tlsCfg, err := h.ClientConfig.TLSConfig() + if err != nil { + return err + } + + h.client = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + Proxy: http.ProxyFromEnvironment, + }, + Timeout: h.Timeout.Duration, + } + return nil +} + // Gather takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" func (h *HTTP) Gather(acc telegraf.Accumulator) error { - if h.parser == nil { - return errors.New("Parser is not set") - } - - if h.client == nil { - tlsCfg, err := h.ClientConfig.TLSConfig() - if err != nil { - return err - } - h.client = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsCfg, - Proxy: http.ProxyFromEnvironment, - }, - Timeout: h.Timeout.Duration, - } - } - var wg sync.WaitGroup for _, u := range h.URLs { wg.Add(1) diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go index 7ac05e135..21eff6265 100644 --- a/plugins/inputs/http/http_test.go +++ b/plugins/inputs/http/http_test.go @@ -37,6 +37,7 @@ func TestHTTPwithJSONFormat(t *testing.T) { plugin.SetParser(p) var acc testutil.Accumulator + plugin.Init() require.NoError(t, acc.GatherError(plugin.Gather)) require.Len(t, acc.Metrics, 1) @@ -78,6 +79,7 @@ func TestHTTPHeaders(t *testing.T) { plugin.SetParser(p) var acc testutil.Accumulator + plugin.Init() require.NoError(t, acc.GatherError(plugin.Gather)) } @@ -100,6 +102,7 @@ func TestInvalidStatusCode(t *testing.T) { plugin.SetParser(p) var acc testutil.Accumulator + plugin.Init() require.Error(t, acc.GatherError(plugin.Gather)) } @@ -125,28 +128,10 @@ func TestMethod(t *testing.T) { plugin.SetParser(p) var acc testutil.Accumulator + plugin.Init() require.NoError(t, acc.GatherError(plugin.Gather)) } -func TestParserNotSet(t *testing.T) { - fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/endpoint" { - _, _ = w.Write([]byte(simpleJSON)) - } else { - w.WriteHeader(http.StatusNotFound) - } - })) - defer fakeServer.Close() - - url := fakeServer.URL + "/endpoint" - plugin := &plugin.HTTP{ - URLs: []string{url}, - } - - var acc testutil.Accumulator - require.Error(t, acc.GatherError(plugin.Gather)) -} - const simpleJSON = ` { "a": 1.2 @@ -237,6 +222,7 @@ func TestBodyAndContentEncoding(t *testing.T) { tt.plugin.SetParser(parser) var acc testutil.Accumulator + tt.plugin.Init() err = tt.plugin.Gather(&acc) require.NoError(t, err) })