From b73a232a6a6a11f41041db6fe2714cfd576027da Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Mon, 4 May 2020 14:09:10 -0400 Subject: [PATCH] Support Go execd plugins with shim (#7283) --- EXTERNAL_PLUGINS.md | 8 + agent/accumulator_test.go | 2 +- agent/agent.go | 4 +- agent/agent_test.go | 26 +- cmd/telegraf/telegraf.go | 2 +- .../config => config}/aws/credentials.go | 0 {internal/config => config}/config.go | 2 +- {internal/config => config}/config_test.go | 2 +- .../testdata/inline_table.toml | 0 .../testdata/invalid_field.toml | 0 .../testdata/non_slice_slice.toml | 0 .../testdata/single_plugin.toml | 0 .../testdata/single_plugin_env_vars.toml | 0 .../testdata/slice_comment.toml | 0 .../testdata/special_types.toml | 0 .../testdata/subconfig/exec.conf | 0 .../testdata/subconfig/memcached.conf | 0 .../testdata/subconfig/procstat.conf | 0 .../testdata/telegraf-agent.toml | 0 .../testdata/wrong_field_type.toml | 0 .../testdata/wrong_field_type2.toml | 0 config/types.go | 88 ++++++ go.mod | 1 + {internal/models => models}/buffer.go | 0 {internal/models => models}/buffer_test.go | 0 {internal/models => models}/filter.go | 0 {internal/models => models}/filter_test.go | 0 {internal/models => models}/log.go | 0 {internal/models => models}/log_test.go | 0 {internal/models => models}/makemetric.go | 0 .../models => models}/running_aggregator.go | 0 .../running_aggregator_test.go | 0 {internal/models => models}/running_input.go | 0 .../models => models}/running_input_test.go | 0 {internal/models => models}/running_output.go | 0 .../models => models}/running_output_test.go | 0 .../models => models}/running_processor.go | 0 .../running_processor_test.go | 0 .../cloud_pubsub_push/pubsub_push_test.go | 2 +- plugins/inputs/cloudwatch/cloudwatch.go | 2 +- plugins/inputs/execd/examples/count.rb | 10 +- plugins/inputs/execd/examples/count.sh | 8 +- plugins/inputs/execd/execd.go | 94 +++--- .../execd/{execd_unix.go => execd_posix.go} | 4 +- plugins/inputs/execd/execd_test.go | 85 ++++++ .../execd/{execd_win.go => execd_windows.go} | 4 +- plugins/inputs/execd/shim/README.md | 48 +++ plugins/inputs/execd/shim/example/cmd/main.go | 60 ++++ .../inputs/execd/shim/example/cmd/plugin.conf | 2 + plugins/inputs/execd/shim/goshim.go | 278 ++++++++++++++++++ .../inputs/execd/shim/goshim_notwindows.go | 14 + plugins/inputs/execd/shim/goshim_windows.go | 13 + plugins/inputs/execd/shim/input.go | 20 ++ plugins/inputs/execd/shim/shim_posix_test.go | 76 +++++ plugins/inputs/execd/shim/shim_test.go | 119 ++++++++ .../kinesis_consumer/kinesis_consumer.go | 2 +- plugins/outputs/cloudwatch/cloudwatch.go | 2 +- plugins/outputs/kinesis/kinesis.go | 2 +- 58 files changed, 915 insertions(+), 65 deletions(-) create mode 100644 EXTERNAL_PLUGINS.md rename {internal/config => config}/aws/credentials.go (100%) rename {internal/config => config}/config.go (99%) rename {internal/config => config}/config_test.go (99%) rename {internal/config => config}/testdata/inline_table.toml (100%) rename {internal/config => config}/testdata/invalid_field.toml (100%) rename {internal/config => config}/testdata/non_slice_slice.toml (100%) rename {internal/config => config}/testdata/single_plugin.toml (100%) rename {internal/config => config}/testdata/single_plugin_env_vars.toml (100%) rename {internal/config => config}/testdata/slice_comment.toml (100%) rename {internal/config => config}/testdata/special_types.toml (100%) rename {internal/config => config}/testdata/subconfig/exec.conf (100%) rename {internal/config => config}/testdata/subconfig/memcached.conf (100%) rename {internal/config => config}/testdata/subconfig/procstat.conf (100%) rename {internal/config => config}/testdata/telegraf-agent.toml (100%) rename {internal/config => config}/testdata/wrong_field_type.toml (100%) rename {internal/config => config}/testdata/wrong_field_type2.toml (100%) create mode 100644 config/types.go rename {internal/models => models}/buffer.go (100%) rename {internal/models => models}/buffer_test.go (100%) rename {internal/models => models}/filter.go (100%) rename {internal/models => models}/filter_test.go (100%) rename {internal/models => models}/log.go (100%) rename {internal/models => models}/log_test.go (100%) rename {internal/models => models}/makemetric.go (100%) rename {internal/models => models}/running_aggregator.go (100%) rename {internal/models => models}/running_aggregator_test.go (100%) rename {internal/models => models}/running_input.go (100%) rename {internal/models => models}/running_input_test.go (100%) rename {internal/models => models}/running_output.go (100%) rename {internal/models => models}/running_output_test.go (100%) rename {internal/models => models}/running_processor.go (100%) rename {internal/models => models}/running_processor_test.go (100%) rename plugins/inputs/execd/{execd_unix.go => execd_posix.go} (81%) create mode 100644 plugins/inputs/execd/execd_test.go rename plugins/inputs/execd/{execd_win.go => execd_windows.go} (74%) create mode 100644 plugins/inputs/execd/shim/README.md create mode 100644 plugins/inputs/execd/shim/example/cmd/main.go create mode 100644 plugins/inputs/execd/shim/example/cmd/plugin.conf create mode 100644 plugins/inputs/execd/shim/goshim.go create mode 100644 plugins/inputs/execd/shim/goshim_notwindows.go create mode 100644 plugins/inputs/execd/shim/goshim_windows.go create mode 100644 plugins/inputs/execd/shim/input.go create mode 100644 plugins/inputs/execd/shim/shim_posix_test.go create mode 100644 plugins/inputs/execd/shim/shim_test.go diff --git a/EXTERNAL_PLUGINS.md b/EXTERNAL_PLUGINS.md new file mode 100644 index 000000000..273c33fbb --- /dev/null +++ b/EXTERNAL_PLUGINS.md @@ -0,0 +1,8 @@ +# External Plugins + +This is a list of plugins that can be compiled outside of Telegraf and used via the execd input. + +Pull requests welcome. + +## Inputs +- [rand](https://github.com/ssoroka/rand) - Generate random numbers \ No newline at end of file diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 496d131f4..38a7e047c 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) diff --git a/agent/agent.go b/agent/agent.go index 863309f28..b68c55d13 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -10,9 +10,9 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/config" - "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/serializers/influx" ) diff --git a/agent/agent_test.go b/agent/agent_test.go index c822a236b..9cc631b17 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/config" _ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all" "github.com/stretchr/testify/assert" @@ -22,35 +22,35 @@ func TestAgent_OmitHostname(t *testing.T) { func TestAgent_LoadPlugin(t *testing.T) { c := config.NewConfig() c.InputFilters = []string{"mysql"} - err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err := c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ := NewAgent(c) assert.Equal(t, 1, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"foo"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 0, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "foo"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 1, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "redis"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "foo", "redis", "bar"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Inputs)) @@ -59,42 +59,42 @@ func TestAgent_LoadPlugin(t *testing.T) { func TestAgent_LoadOutput(t *testing.T) { c := config.NewConfig() c.OutputFilters = []string{"influxdb"} - err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err := c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ := NewAgent(c) assert.Equal(t, 2, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"kafka"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 1, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"foo"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 0, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"influxdb", "foo"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"influxdb", "kafka"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) assert.Equal(t, 3, len(c.Outputs)) a, _ = NewAgent(c) @@ -102,7 +102,7 @@ func TestAgent_LoadOutput(t *testing.T) { c = config.NewConfig() c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"} - err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index c1f7344da..4f51bc2e1 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -16,8 +16,8 @@ import ( "time" "github.com/influxdata/telegraf/agent" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/goplugin" "github.com/influxdata/telegraf/logger" _ "github.com/influxdata/telegraf/plugins/aggregators/all" diff --git a/internal/config/aws/credentials.go b/config/aws/credentials.go similarity index 100% rename from internal/config/aws/credentials.go rename to config/aws/credentials.go diff --git a/internal/config/config.go b/config/config.go similarity index 99% rename from internal/config/config.go rename to config/config.go index c2335fac2..0ebb9e29b 100644 --- a/internal/config/config.go +++ b/config/config.go @@ -20,7 +20,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/aggregators" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" diff --git a/internal/config/config_test.go b/config/config_test.go similarity index 99% rename from internal/config/config_test.go rename to config/config_test.go index 9d42177cd..c4a960265 100644 --- a/internal/config/config_test.go +++ b/config/config_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/exec" "github.com/influxdata/telegraf/plugins/inputs/http_listener_v2" diff --git a/internal/config/testdata/inline_table.toml b/config/testdata/inline_table.toml similarity index 100% rename from internal/config/testdata/inline_table.toml rename to config/testdata/inline_table.toml diff --git a/internal/config/testdata/invalid_field.toml b/config/testdata/invalid_field.toml similarity index 100% rename from internal/config/testdata/invalid_field.toml rename to config/testdata/invalid_field.toml diff --git a/internal/config/testdata/non_slice_slice.toml b/config/testdata/non_slice_slice.toml similarity index 100% rename from internal/config/testdata/non_slice_slice.toml rename to config/testdata/non_slice_slice.toml diff --git a/internal/config/testdata/single_plugin.toml b/config/testdata/single_plugin.toml similarity index 100% rename from internal/config/testdata/single_plugin.toml rename to config/testdata/single_plugin.toml diff --git a/internal/config/testdata/single_plugin_env_vars.toml b/config/testdata/single_plugin_env_vars.toml similarity index 100% rename from internal/config/testdata/single_plugin_env_vars.toml rename to config/testdata/single_plugin_env_vars.toml diff --git a/internal/config/testdata/slice_comment.toml b/config/testdata/slice_comment.toml similarity index 100% rename from internal/config/testdata/slice_comment.toml rename to config/testdata/slice_comment.toml diff --git a/internal/config/testdata/special_types.toml b/config/testdata/special_types.toml similarity index 100% rename from internal/config/testdata/special_types.toml rename to config/testdata/special_types.toml diff --git a/internal/config/testdata/subconfig/exec.conf b/config/testdata/subconfig/exec.conf similarity index 100% rename from internal/config/testdata/subconfig/exec.conf rename to config/testdata/subconfig/exec.conf diff --git a/internal/config/testdata/subconfig/memcached.conf b/config/testdata/subconfig/memcached.conf similarity index 100% rename from internal/config/testdata/subconfig/memcached.conf rename to config/testdata/subconfig/memcached.conf diff --git a/internal/config/testdata/subconfig/procstat.conf b/config/testdata/subconfig/procstat.conf similarity index 100% rename from internal/config/testdata/subconfig/procstat.conf rename to config/testdata/subconfig/procstat.conf diff --git a/internal/config/testdata/telegraf-agent.toml b/config/testdata/telegraf-agent.toml similarity index 100% rename from internal/config/testdata/telegraf-agent.toml rename to config/testdata/telegraf-agent.toml diff --git a/internal/config/testdata/wrong_field_type.toml b/config/testdata/wrong_field_type.toml similarity index 100% rename from internal/config/testdata/wrong_field_type.toml rename to config/testdata/wrong_field_type.toml diff --git a/internal/config/testdata/wrong_field_type2.toml b/config/testdata/wrong_field_type2.toml similarity index 100% rename from internal/config/testdata/wrong_field_type2.toml rename to config/testdata/wrong_field_type2.toml diff --git a/config/types.go b/config/types.go new file mode 100644 index 000000000..5703c8411 --- /dev/null +++ b/config/types.go @@ -0,0 +1,88 @@ +package config + +import ( + "bytes" + "strconv" + "time" + + "github.com/alecthomas/units" +) + +// Duration is a time.Duration +type Duration time.Duration + +// Size is an int64 +type Size int64 + +// Number is a float +type Number float64 + +// UnmarshalTOML parses the duration from the TOML config file +func (d Duration) UnmarshalTOML(b []byte) error { + var err error + b = bytes.Trim(b, `'`) + + // see if we can directly convert it + dur, err := time.ParseDuration(string(b)) + if err == nil { + d = Duration(dur) + return nil + } + + // Parse string duration, ie, "1s" + if uq, err := strconv.Unquote(string(b)); err == nil && len(uq) > 0 { + dur, err := time.ParseDuration(uq) + if err == nil { + d = Duration(dur) + return nil + } + } + + // First try parsing as integer seconds + sI, err := strconv.ParseInt(string(b), 10, 64) + if err == nil { + dur := time.Second * time.Duration(sI) + d = Duration(dur) + return nil + } + // Second try parsing as float seconds + sF, err := strconv.ParseFloat(string(b), 64) + if err == nil { + dur := time.Second * time.Duration(sF) + d = Duration(dur) + return nil + } + + return nil +} + +func (s Size) UnmarshalTOML(b []byte) error { + var err error + b = bytes.Trim(b, `'`) + + val, err := strconv.ParseInt(string(b), 10, 64) + if err == nil { + s = Size(val) + return nil + } + uq, err := strconv.Unquote(string(b)) + if err != nil { + return err + } + val, err = units.ParseStrictBytes(uq) + if err != nil { + return err + } + s = Size(val) + return nil +} + +func (n Number) UnmarshalTOML(b []byte) error { + value, err := strconv.ParseFloat(string(b), 64) + if err != nil { + return err + } + + n = Number(value) + return nil +} diff --git a/go.mod b/go.mod index 61515f70c..4986adc77 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/Azure/azure-storage-queue-go v0.0.0-20181215014128-6ed74e755687 github.com/Azure/go-autorest/autorest v0.9.3 github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 + github.com/BurntSushi/toml v0.3.1 github.com/Mellanox/rdmamap v0.0.0-20191106181932-7c3c4763a6ee github.com/Microsoft/ApplicationInsights-Go v0.4.2 github.com/Microsoft/go-winio v0.4.9 // indirect diff --git a/internal/models/buffer.go b/models/buffer.go similarity index 100% rename from internal/models/buffer.go rename to models/buffer.go diff --git a/internal/models/buffer_test.go b/models/buffer_test.go similarity index 100% rename from internal/models/buffer_test.go rename to models/buffer_test.go diff --git a/internal/models/filter.go b/models/filter.go similarity index 100% rename from internal/models/filter.go rename to models/filter.go diff --git a/internal/models/filter_test.go b/models/filter_test.go similarity index 100% rename from internal/models/filter_test.go rename to models/filter_test.go diff --git a/internal/models/log.go b/models/log.go similarity index 100% rename from internal/models/log.go rename to models/log.go diff --git a/internal/models/log_test.go b/models/log_test.go similarity index 100% rename from internal/models/log_test.go rename to models/log_test.go diff --git a/internal/models/makemetric.go b/models/makemetric.go similarity index 100% rename from internal/models/makemetric.go rename to models/makemetric.go diff --git a/internal/models/running_aggregator.go b/models/running_aggregator.go similarity index 100% rename from internal/models/running_aggregator.go rename to models/running_aggregator.go diff --git a/internal/models/running_aggregator_test.go b/models/running_aggregator_test.go similarity index 100% rename from internal/models/running_aggregator_test.go rename to models/running_aggregator_test.go diff --git a/internal/models/running_input.go b/models/running_input.go similarity index 100% rename from internal/models/running_input.go rename to models/running_input.go diff --git a/internal/models/running_input_test.go b/models/running_input_test.go similarity index 100% rename from internal/models/running_input_test.go rename to models/running_input_test.go diff --git a/internal/models/running_output.go b/models/running_output.go similarity index 100% rename from internal/models/running_output.go rename to models/running_output.go diff --git a/internal/models/running_output_test.go b/models/running_output_test.go similarity index 100% rename from internal/models/running_output_test.go rename to models/running_output_test.go diff --git a/internal/models/running_processor.go b/models/running_processor.go similarity index 100% rename from internal/models/running_processor.go rename to models/running_processor.go diff --git a/internal/models/running_processor_test.go b/models/running_processor_test.go similarity index 100% rename from internal/models/running_processor_test.go rename to models/running_processor_test.go diff --git a/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go b/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go index 308a8181d..ae7601b20 100644 --- a/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go +++ b/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go @@ -16,7 +16,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" ) diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index 3148e2c75..cb0e10ac0 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -12,9 +12,9 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/influxdata/telegraf" + internalaws "github.com/influxdata/telegraf/config/aws" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" - internalaws "github.com/influxdata/telegraf/internal/config/aws" "github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/inputs" diff --git a/plugins/inputs/execd/examples/count.rb b/plugins/inputs/execd/examples/count.rb index 220848d64..6b60fbc17 100755 --- a/plugins/inputs/execd/examples/count.rb +++ b/plugins/inputs/execd/examples/count.rb @@ -4,8 +4,16 @@ counter = 0 +def time_ns_str(t) + ns = t.nsec.to_s + (9 - ns.size).times do + ns = "0" + ns # left pad + end + t.to_i.to_s + ns +end + loop do - puts "counter_ruby count=#{counter}" + puts "counter_ruby count=#{counter} #{time_ns_str(Time.now)}" STDOUT.flush counter += 1 diff --git a/plugins/inputs/execd/examples/count.sh b/plugins/inputs/execd/examples/count.sh index aa6932a80..bbbe8619c 100755 --- a/plugins/inputs/execd/examples/count.sh +++ b/plugins/inputs/execd/examples/count.sh @@ -1,12 +1,12 @@ -#!/bin/bash +#!/bin/sh ## Example in bash using STDIN signaling counter=0 -while read; do +while read LINE; do echo "counter_bash count=${counter}" - let counter=counter+1 + counter=$((counter+1)) done -(>&2 echo "terminate") +trap "echo terminate 1>&2" EXIT \ No newline at end of file diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index e852d045e..90a5ceffb 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -43,12 +44,14 @@ const sampleConfig = ` type Execd struct { Command []string Signal string - RestartDelay internal.Duration + RestartDelay config.Duration acc telegraf.Accumulator cmd *exec.Cmd parser parsers.Parser stdin io.WriteCloser + stdout io.ReadCloser + stderr io.ReadCloser cancel context.CancelFunc wg sync.WaitGroup } @@ -69,13 +72,17 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { e.acc = acc if len(e.Command) == 0 { - return fmt.Errorf("E! [inputs.execd] FATAL no command specified") + return fmt.Errorf("FATAL no command specified") } e.wg.Add(1) - var ctx context.Context - ctx, e.cancel = context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + e.cancel = cancel + + if err := e.cmdStart(); err != nil { + return err + } go func() { e.cmdLoop(ctx) @@ -90,79 +97,98 @@ func (e *Execd) Stop() { e.wg.Wait() } -func (e *Execd) cmdLoop(ctx context.Context) { +// cmdLoop watches an already running process, restarting it when appropriate. +func (e *Execd) cmdLoop(ctx context.Context) error { for { // Use a buffered channel to ensure goroutine below can exit // if `ctx.Done` is selected and nothing reads on `done` anymore done := make(chan error, 1) go func() { - done <- e.cmdRun() + done <- e.cmdWait() }() select { case <-ctx.Done(): - e.stdin.Close() - // Immediately exit process but with a graceful shutdown - // period before killing - internal.WaitTimeout(e.cmd, 200*time.Millisecond) - return + if e.stdin != nil { + e.stdin.Close() + // Immediately exit process but with a graceful shutdown + // period before killing + internal.WaitTimeout(e.cmd, 200*time.Millisecond) + } + return nil case err := <-done: - log.Printf("E! [inputs.execd] Process %s terminated: %s", e.Command, err) + log.Printf("Process %s terminated: %s", e.Command, err) + if isQuitting(ctx) { + return err + } } - log.Printf("E! [inputs.execd] Restarting in %s...", e.RestartDelay.Duration) + log.Printf("Restarting in %s...", time.Duration(e.RestartDelay)) select { case <-ctx.Done(): - return - case <-time.After(e.RestartDelay.Duration): + return nil + case <-time.After(time.Duration(e.RestartDelay)): // Continue the loop and restart the process + if err := e.cmdStart(); err != nil { + return err + } } } } -func (e *Execd) cmdRun() error { - var wg sync.WaitGroup +func isQuitting(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} +func (e *Execd) cmdStart() (err error) { if len(e.Command) > 1 { e.cmd = exec.Command(e.Command[0], e.Command[1:]...) } else { e.cmd = exec.Command(e.Command[0]) } - stdin, err := e.cmd.StdinPipe() + e.stdin, err = e.cmd.StdinPipe() if err != nil { - return fmt.Errorf("E! [inputs.execd] Error opening stdin pipe: %s", err) + return fmt.Errorf("Error opening stdin pipe: %s", err) } - e.stdin = stdin - - stdout, err := e.cmd.StdoutPipe() + e.stdout, err = e.cmd.StdoutPipe() if err != nil { - return fmt.Errorf("E! [inputs.execd] Error opening stdout pipe: %s", err) + return fmt.Errorf("Error opening stdout pipe: %s", err) } - stderr, err := e.cmd.StderrPipe() + e.stderr, err = e.cmd.StderrPipe() if err != nil { - return fmt.Errorf("E! [inputs.execd] Error opening stderr pipe: %s", err) + return fmt.Errorf("Error opening stderr pipe: %s", err) } - log.Printf("D! [inputs.execd] Starting process: %s", e.Command) + log.Printf("Starting process: %s", e.Command) err = e.cmd.Start() if err != nil { - return fmt.Errorf("E! [inputs.execd] Error starting process: %s", err) + return fmt.Errorf("Error starting process: %s", err) } + return nil +} + +func (e *Execd) cmdWait() error { + var wg sync.WaitGroup wg.Add(2) go func() { - e.cmdReadOut(stdout) + e.cmdReadOut(e.stdout) wg.Done() }() go func() { - e.cmdReadErr(stderr) + e.cmdReadErr(e.stderr) wg.Done() }() @@ -176,7 +202,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { for scanner.Scan() { metrics, err := e.parser.Parse(scanner.Bytes()) if err != nil { - e.acc.AddError(fmt.Errorf("E! [inputs.execd] Parse error: %s", err)) + e.acc.AddError(fmt.Errorf("Parse error: %s", err)) } for _, metric := range metrics { @@ -185,7 +211,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { } if err := scanner.Err(); err != nil { - e.acc.AddError(fmt.Errorf("E! [inputs.execd] Error reading stdout: %s", err)) + e.acc.AddError(fmt.Errorf("Error reading stdout: %s", err)) } } @@ -193,11 +219,11 @@ func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) for scanner.Scan() { - log.Printf("E! [inputs.execd] stderr: %q", scanner.Text()) + log.Printf("stderr: %q", scanner.Text()) } if err := scanner.Err(); err != nil { - e.acc.AddError(fmt.Errorf("E! [inputs.execd] Error reading stderr: %s", err)) + e.acc.AddError(fmt.Errorf("Error reading stderr: %s", err)) } } @@ -205,7 +231,7 @@ func init() { inputs.Add("execd", func() telegraf.Input { return &Execd{ Signal: "none", - RestartDelay: internal.Duration{Duration: 10 * time.Second}, + RestartDelay: config.Duration(10 * time.Second), } }) } diff --git a/plugins/inputs/execd/execd_unix.go b/plugins/inputs/execd/execd_posix.go similarity index 81% rename from plugins/inputs/execd/execd_unix.go rename to plugins/inputs/execd/execd_posix.go index a092cfc80..919447260 100644 --- a/plugins/inputs/execd/execd_unix.go +++ b/plugins/inputs/execd/execd_posix.go @@ -23,7 +23,9 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { case "SIGUSR2": e.cmd.Process.Signal(syscall.SIGUSR2) case "STDIN": - io.WriteString(e.stdin, "\n") + if _, err := io.WriteString(e.stdin, "\n"); err != nil { + return fmt.Errorf("Error writing to stdin: %s", err) + } case "none": default: return fmt.Errorf("invalid signal: %s", e.Signal) diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go new file mode 100644 index 000000000..b78075e95 --- /dev/null +++ b/plugins/inputs/execd/execd_test.go @@ -0,0 +1,85 @@ +// +build !windows + +package execd + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/agent" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/models" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/plugins/parsers" + + "github.com/influxdata/telegraf" +) + +func TestExternalInputWorks(t *testing.T) { + jsonParser, err := parsers.NewInfluxParser() + require.NoError(t, err) + + e := &Execd{ + Command: []string{shell(), fileShellScriptPath()}, + RestartDelay: config.Duration(5 * time.Second), + parser: jsonParser, + Signal: "STDIN", + } + + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + acc := agent.NewAccumulator(&TestMetricMaker{}, metrics) + + require.NoError(t, e.Start(acc)) + require.NoError(t, e.Gather(acc)) + e.Stop() + + // grab a metric and make sure it's a thing + m := readChanWithTimeout(t, metrics, 10*time.Second) + + require.Equal(t, "counter_bash", m.Name()) + val, ok := m.GetField("count") + require.True(t, ok) + require.Equal(t, float64(0), val) + // test that a later gather will not panic + e.Gather(acc) +} + +func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric { + to := time.NewTimer(timeout) + defer to.Stop() + select { + case m := <-metrics: + return m + case <-to.C: + require.FailNow(t, "timeout waiting for metric") + } + return nil +} + +func fileShellScriptPath() string { + return "./examples/count.sh" +} + +func shell() string { + return "sh" +} + +type TestMetricMaker struct{} + +func (tm *TestMetricMaker) Name() string { + return "TestPlugin" +} + +func (tm *TestMetricMaker) LogName() string { + return tm.Name() +} + +func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { + return metric +} + +func (tm *TestMetricMaker) Log() telegraf.Logger { + return models.NewLogger("TestPlugin", "test", "") +} diff --git a/plugins/inputs/execd/execd_win.go b/plugins/inputs/execd/execd_windows.go similarity index 74% rename from plugins/inputs/execd/execd_win.go rename to plugins/inputs/execd/execd_windows.go index 85ced4a6a..443d8f686 100644 --- a/plugins/inputs/execd/execd_win.go +++ b/plugins/inputs/execd/execd_windows.go @@ -16,7 +16,9 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { switch e.Signal { case "STDIN": - io.WriteString(e.stdin, "\n") + if _, err := io.WriteString(e.stdin, "\n"); err != nil { + return fmt.Errorf("Error writing to stdin: %s", err) + } case "none": default: return fmt.Errorf("invalid signal: %s", e.Signal) diff --git a/plugins/inputs/execd/shim/README.md b/plugins/inputs/execd/shim/README.md new file mode 100644 index 000000000..f955ef15f --- /dev/null +++ b/plugins/inputs/execd/shim/README.md @@ -0,0 +1,48 @@ +# Telegraf Execd Go Shim + +The goal of this _shim_ is to make it trivial to extract an internal input plugin +out to a stand-alone repo for the purpose of compiling it as a separate app and +running it from the inputs.execd plugin. + +The execd-shim is still experimental and the interface may change in the future. +Especially as the concept expands to prcoessors, aggregators, and outputs. + +## Steps to externalize a plugin + +1. Move the project to an external repo, optionally preserving the + _plugins/inputs/plugin_name_ folder structure. For an example of what this might + look at, take a look at [ssoroka/rand](https://github.com/ssoroka/rand) or + [danielnelson/telegraf-plugins](https://github.com/danielnelson/telegraf-plugins) +1. Copy [main.go](./example/cmd/main.go) into your project under the cmd folder. + This will be the entrypoint to the plugin when run as a stand-alone program, and + it will call the shim code for you to make that happen. +1. Edit the main.go file to import your plugin. Within Telegraf this would have + been done in an all.go file, but here we don't split the two apart, and the change + just goes in the top of main.go. If you skip this step, your plugin will do nothing. +1. Optionally add a [plugin.conf](./example/cmd/plugin.conf) for configuration + specific to your plugin. Note that this config file **must be separate from the + rest of the config for Telegraf, and must not be in a shared directory where + Telegraf is expecting to load all configs**. If Telegraf reads this config file + it will not know which plugin it relates to. + +## Steps to build and run your plugin + +1. Build the cmd/main.go. For my rand project this looks like `go build -o rand cmd/main.go` +1. Test out the binary if you haven't done this yet. eg `./rand -config plugin.conf` + Depending on your polling settings and whether you implemented a service plugin or + an input gathering plugin, you may see data right away, or you may have to hit enter + first, or wait for your poll duration to elapse, but the metrics will be written to + STDOUT. Ctrl-C to end your test. +1. Configure Telegraf to call your new plugin binary. eg: + +``` +[[inputs.execd]] + command = ["/path/to/rand", "-config", "/path/to/plugin.conf"] + signal = "none" +``` + +## Congratulations! + +You've done it! Consider publishing your plugin to github and open a Pull Request +back to the Telegraf repo letting us know about the availability of your +[external plugin](https://github.com/influxdata/telegraf/blob/master/EXTERNAL_PLUGINS.md). \ No newline at end of file diff --git a/plugins/inputs/execd/shim/example/cmd/main.go b/plugins/inputs/execd/shim/example/cmd/main.go new file mode 100644 index 000000000..bf8bd50d8 --- /dev/null +++ b/plugins/inputs/execd/shim/example/cmd/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + // TODO: import your plugins + // _ "github.com/my_github_user/my_plugin_repo/plugins/inputs/mypluginname" + + "github.com/influxdata/telegraf/plugins/inputs/execd/shim" +) + +var pollInterval = flag.Duration("poll_interval", 1*time.Second, "how often to send metrics") +var pollIntervalDisabled = flag.Bool("poll_interval_disabled", false, "how often to send metrics") +var configFile = flag.String("config", "", "path to the config file for this plugin") +var err error + +// This is designed to be simple; Just change the import above and you're good. +// +// However, if you want to do all your config in code, you can like so: +// +// // initialize your plugin with any settngs you want +// myInput := &mypluginname.MyPlugin{ +// DefaultSettingHere: 3, +// } +// +// shim := shim.New() +// +// shim.AddInput(myInput) +// +// // now the shim.Run() call as below. +// +func main() { + // parse command line options + flag.Parse() + if *pollIntervalDisabled { + *pollInterval = shim.PollIntervalDisabled + } + + // create the shim. This is what will run your plugins. + shim := shim.New() + + // If no config is specified, all imported plugins are loaded. + // otherwise follow what the config asks for. + // Check for settings from a config toml file, + // (or just use whatever plugins were imported above) + err = shim.LoadConfig(configFile) + if err != nil { + fmt.Fprintf(os.Stderr, "Err loading input: %s\n", err) + os.Exit(1) + } + + // run the input plugin(s) until stdin closes or we receive a termination signal + if err := shim.Run(*pollInterval); err != nil { + fmt.Fprintf(os.Stderr, "Err: %s\n", err) + os.Exit(1) + } +} diff --git a/plugins/inputs/execd/shim/example/cmd/plugin.conf b/plugins/inputs/execd/shim/example/cmd/plugin.conf new file mode 100644 index 000000000..53f89a559 --- /dev/null +++ b/plugins/inputs/execd/shim/example/cmd/plugin.conf @@ -0,0 +1,2 @@ +[[inputs.my_plugin_name]] + value_name = "value" diff --git a/plugins/inputs/execd/shim/goshim.go b/plugins/inputs/execd/shim/goshim.go new file mode 100644 index 000000000..cd0c4ddec --- /dev/null +++ b/plugins/inputs/execd/shim/goshim.go @@ -0,0 +1,278 @@ +package shim + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/agent" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/serializers/influx" +) + +type empty struct{} + +var ( + gatherPromptChans []chan empty + stdout io.Writer = os.Stdout + stdin io.Reader = os.Stdin +) + +const ( + // PollIntervalDisabled is used to indicate that you want to disable polling, + // as opposed to duration 0 meaning poll constantly. + PollIntervalDisabled = time.Duration(0) +) + +type Shim struct { + Inputs []telegraf.Input +} + +func New() *Shim { + return &Shim{} +} + +// AddInput adds the input to the shim. Later calls to Run() will run this input. +func (s *Shim) AddInput(input telegraf.Input) error { + if p, ok := input.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return fmt.Errorf("failed to init input: %s", err) + } + } + + s.Inputs = append(s.Inputs, input) + return nil +} + +// AddInputs adds multiple inputs to the shim. Later calls to Run() will run these. +func (s *Shim) AddInputs(newInputs []telegraf.Input) error { + for _, inp := range newInputs { + if err := s.AddInput(inp); err != nil { + return err + } + } + return nil +} + +// Run the input plugins.. +func (s *Shim) Run(pollInterval time.Duration) error { + wg := sync.WaitGroup{} + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + collectMetricsPrompt := make(chan os.Signal, 1) + listenForCollectMetricsSignals(collectMetricsPrompt) + + wg.Add(1) // wait for the metric channel to close + metricCh := make(chan telegraf.Metric, 1) + + serializer := influx.NewSerializer() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for _, input := range s.Inputs { + wrappedInput := inputShim{Input: input} + + acc := agent.NewAccumulator(wrappedInput, metricCh) + acc.SetPrecision(time.Nanosecond) + + if serviceInput, ok := input.(telegraf.ServiceInput); ok { + if err := serviceInput.Start(acc); err != nil { + return fmt.Errorf("failed to start input: %s", err) + } + } + gatherPromptCh := make(chan empty, 1) + gatherPromptChans = append(gatherPromptChans, gatherPromptCh) + wg.Add(1) + go func(input telegraf.Input) { + startGathering(ctx, input, acc, gatherPromptCh, pollInterval) + if serviceInput, ok := input.(telegraf.ServiceInput); ok { + serviceInput.Stop() + } + wg.Done() + }(input) + } + + go stdinCollectMetricsPrompt(ctx, collectMetricsPrompt) + +loop: + for { + select { + case <-quit: + // cancel, but keep looping until the metric channel closes. + cancel() + case <-collectMetricsPrompt: + collectMetrics(ctx) + case m, open := <-metricCh: + if !open { + wg.Done() + break loop + } + b, err := serializer.Serialize(m) + if err != nil { + return fmt.Errorf("failed to serialize metric: %s", err) + } + // Write this to stdout + fmt.Fprint(stdout, string(b)) + } + } + + wg.Wait() + return nil +} + +func hasQuit(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + +func stdinCollectMetricsPrompt(ctx context.Context, collectMetricsPrompt chan<- os.Signal) { + s := bufio.NewScanner(stdin) + // for every line read from stdin, make sure we're not supposed to quit, + // then push a message on to the collectMetricsPrompt + for s.Scan() { + // first check if we should quit + if hasQuit(ctx) { + return + } + + // now push a non-blocking message to trigger metric collection. + pushCollectMetricsRequest(collectMetricsPrompt) + } +} + +// pushCollectMetricsRequest pushes a non-blocking (nil) message to the +// collectMetricsPrompt channel to trigger metric collection. +// The channel is defined with a buffer of 1, so if it's full, duplicated +// requests are discarded. +func pushCollectMetricsRequest(collectMetricsPrompt chan<- os.Signal) { + select { + case collectMetricsPrompt <- nil: + default: + } +} + +func collectMetrics(ctx context.Context) { + if hasQuit(ctx) { + return + } + for i := 0; i < len(gatherPromptChans); i++ { + // push a message out to each channel to collect metrics. don't block. + select { + case gatherPromptChans[i] <- empty{}: + default: + } + } +} + +func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) { + if pollInterval == PollIntervalDisabled { + return // don't poll + } + t := time.NewTicker(pollInterval) + defer t.Stop() + for { + // give priority to stopping. + if hasQuit(ctx) { + return + } + // see what's up + select { + case <-ctx.Done(): + return + case <-gatherPromptCh: + if err := input.Gather(acc); err != nil { + fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) + } + case <-t.C: + if err := input.Gather(acc); err != nil { + fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) + } + } + } +} + +// LoadConfig loads and adds the inputs to the shim +func (s *Shim) LoadConfig(filePath *string) error { + loadedInputs, err := LoadConfig(filePath) + if err != nil { + return err + } + return s.AddInputs(loadedInputs) +} + +// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and +// have registered themselves with the registry. This makes loading plugins +// without having to define a config dead easy. +func DefaultImportedPlugins() (i []telegraf.Input, e error) { + for _, inputCreatorFunc := range inputs.Inputs { + i = append(i, inputCreatorFunc()) + } + return i, nil +} + +// LoadConfig loads the config and returns inputs that later need to be loaded. +func LoadConfig(filePath *string) ([]telegraf.Input, error) { + if filePath == nil { + return DefaultImportedPlugins() + } + + b, err := ioutil.ReadFile(*filePath) + if err != nil { + return nil, err + } + + conf := struct { + Inputs map[string][]toml.Primitive + }{} + + md, err := toml.Decode(string(b), &conf) + if err != nil { + return nil, err + } + + loadedInputs, err := loadConfigIntoInputs(md, conf.Inputs) + + if len(md.Undecoded()) > 0 { + fmt.Fprintf(stdout, "Some plugins were loaded but not used: %q\n", md.Undecoded()) + } + return loadedInputs, err +} + +func loadConfigIntoInputs(md toml.MetaData, inputConfigs map[string][]toml.Primitive) ([]telegraf.Input, error) { + renderedInputs := []telegraf.Input{} + + for name, primitives := range inputConfigs { + inputCreator, ok := inputs.Inputs[name] + if !ok { + return nil, errors.New("unknown input " + name) + } + + for _, primitive := range primitives { + inp := inputCreator() + // Parse specific configuration + if err := md.PrimitiveDecode(primitive, inp); err != nil { + return nil, err + } + + renderedInputs = append(renderedInputs, inp) + } + } + return renderedInputs, nil +} diff --git a/plugins/inputs/execd/shim/goshim_notwindows.go b/plugins/inputs/execd/shim/goshim_notwindows.go new file mode 100644 index 000000000..67d41884f --- /dev/null +++ b/plugins/inputs/execd/shim/goshim_notwindows.go @@ -0,0 +1,14 @@ +// +build !windows + +package shim + +import ( + "os" + "os/signal" + "syscall" +) + +func listenForCollectMetricsSignals(collectMetricsPrompt chan os.Signal) { + // just listen to all the signals. + signal.Notify(collectMetricsPrompt, syscall.SIGHUP, syscall.SIGUSR1, syscall.SIGUSR2) +} diff --git a/plugins/inputs/execd/shim/goshim_windows.go b/plugins/inputs/execd/shim/goshim_windows.go new file mode 100644 index 000000000..a6bfd1ede --- /dev/null +++ b/plugins/inputs/execd/shim/goshim_windows.go @@ -0,0 +1,13 @@ +// +build windows + +package shim + +import ( + "os" + "os/signal" + "syscall" +) + +func listenForCollectMetricsSignals(collectMetricsPrompt chan os.Signal) { + signal.Notify(collectMetricsPrompt, syscall.SIGHUP) +} diff --git a/plugins/inputs/execd/shim/input.go b/plugins/inputs/execd/shim/input.go new file mode 100644 index 000000000..6dff9cd7f --- /dev/null +++ b/plugins/inputs/execd/shim/input.go @@ -0,0 +1,20 @@ +package shim + +import "github.com/influxdata/telegraf" + +// inputShim implements the MetricMaker interface. +type inputShim struct { + Input telegraf.Input +} + +func (i inputShim) LogName() string { + return "" +} + +func (i inputShim) MakeMetric(m telegraf.Metric) telegraf.Metric { + return m // don't need to do anything to it. +} + +func (i inputShim) Log() telegraf.Logger { + return nil +} diff --git a/plugins/inputs/execd/shim/shim_posix_test.go b/plugins/inputs/execd/shim/shim_posix_test.go new file mode 100644 index 000000000..85053130f --- /dev/null +++ b/plugins/inputs/execd/shim/shim_posix_test.go @@ -0,0 +1,76 @@ +// +build !windows + +package shim + +import ( + "bytes" + "context" + "os" + "runtime" + "strings" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestShimUSR1SignalingWorks(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip() + return + } + stdoutBytes := bytes.NewBufferString("") + stdout = stdoutBytes + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wait := runInputPlugin(t, 40*time.Second) + + // sleep a bit to avoid a race condition where the input hasn't loaded yet. + time.Sleep(10 * time.Millisecond) + + // signal USR1 to yourself. + pid := os.Getpid() + process, err := os.FindProcess(pid) + require.NoError(t, err) + + go func() { + // On slow machines this signal can fire before the service comes up. + // rather than depend on accurate sleep times, we'll just retry sending + // the signal every so often until it goes through. + for { + select { + case <-ctx.Done(): + return // test is done + default: + // test isn't done, keep going. + process.Signal(syscall.SIGUSR1) + time.Sleep(200 * time.Millisecond) + } + } + }() + + timeout := time.NewTimer(10 * time.Second) + + select { + case <-wait: + case <-timeout.C: + require.Fail(t, "Timeout waiting for metric to arrive") + } + + for stdoutBytes.Len() == 0 { + select { + case <-timeout.C: + require.Fail(t, "Timeout waiting to read metric from stdout") + return + default: + time.Sleep(10 * time.Millisecond) + } + } + + out := string(stdoutBytes.Bytes()) + require.Contains(t, out, "\n") + metricLine := strings.Split(out, "\n")[0] + require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) +} diff --git a/plugins/inputs/execd/shim/shim_test.go b/plugins/inputs/execd/shim/shim_test.go new file mode 100644 index 000000000..9d97bd239 --- /dev/null +++ b/plugins/inputs/execd/shim/shim_test.go @@ -0,0 +1,119 @@ +package shim + +import ( + "bytes" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" +) + +func TestShimWorks(t *testing.T) { + stdoutBytes := bytes.NewBufferString("") + stdout = stdoutBytes + + timeout := time.NewTimer(10 * time.Second) + wait := runInputPlugin(t, 10*time.Millisecond) + + select { + case <-wait: + case <-timeout.C: + require.Fail(t, "Timeout waiting for metric to arrive") + } + for stdoutBytes.Len() == 0 { + select { + case <-timeout.C: + require.Fail(t, "Timeout waiting to read metric from stdout") + return + default: + time.Sleep(10 * time.Millisecond) + } + } + + out := string(stdoutBytes.Bytes()) + require.Contains(t, out, "\n") + metricLine := strings.Split(out, "\n")[0] + require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) +} + +func TestShimStdinSignalingWorks(t *testing.T) { + stdoutBytes := bytes.NewBufferString("") + stdout = stdoutBytes + stdinBytes := bytes.NewBufferString("") + stdin = stdinBytes + + timeout := time.NewTimer(10 * time.Second) + wait := runInputPlugin(t, 40*time.Second) + + stdinBytes.WriteString("\n") + + select { + case <-wait: + case <-timeout.C: + require.Fail(t, "Timeout waiting for metric to arrive") + } + + for stdoutBytes.Len() == 0 { + select { + case <-timeout.C: + require.Fail(t, "Timeout waiting to read metric from stdout") + return + default: + time.Sleep(10 * time.Millisecond) + } + } + + out := string(stdoutBytes.Bytes()) + require.Contains(t, out, "\n") + metricLine := strings.Split(out, "\n")[0] + require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) +} + +func runInputPlugin(t *testing.T, timeout time.Duration) chan bool { + wait := make(chan bool) + inp := &testInput{ + wait: wait, + } + + shim := New() + shim.AddInput(inp) + go func() { + err := shim.Run(timeout) // we aren't using the timer here + require.NoError(t, err) + }() + return wait +} + +type testInput struct { + wait chan bool +} + +func (i *testInput) SampleConfig() string { + return "" +} + +func (i *testInput) Description() string { + return "" +} + +func (i *testInput) Gather(acc telegraf.Accumulator) error { + acc.AddFields("measurement", + map[string]interface{}{ + "field": 1, + }, + map[string]string{ + "tag": "tag", + }, time.Unix(1234, 5678)) + i.wait <- true + return nil +} + +func (i *testInput) Start(acc telegraf.Accumulator) error { + return nil +} + +func (i *testInput) Stop() { +} diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer.go b/plugins/inputs/kinesis_consumer/kinesis_consumer.go index aec806da1..b524cf9e4 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer.go @@ -14,7 +14,7 @@ import ( "github.com/harlow/kinesis-consumer/checkpoint/ddb" "github.com/influxdata/telegraf" - internalaws "github.com/influxdata/telegraf/internal/config/aws" + internalaws "github.com/influxdata/telegraf/config/aws" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index 1ae8bd4f8..5e59ba2aa 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -11,7 +11,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/influxdata/telegraf" - internalaws "github.com/influxdata/telegraf/internal/config/aws" + internalaws "github.com/influxdata/telegraf/config/aws" "github.com/influxdata/telegraf/plugins/outputs" ) diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index d2d482ff3..f6b205b1e 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -8,7 +8,7 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/gofrs/uuid" "github.com/influxdata/telegraf" - internalaws "github.com/influxdata/telegraf/internal/config/aws" + internalaws "github.com/influxdata/telegraf/config/aws" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" )