diff --git a/accumulator.go b/accumulator.go index 83f61ae99..cbea58ebf 100644 --- a/accumulator.go +++ b/accumulator.go @@ -1,188 +1,21 @@ package telegraf -import ( - "fmt" - "log" - "math" - "sync" - "time" - - "github.com/influxdata/telegraf/internal/models" - - "github.com/influxdata/influxdb/client/v2" -) +import "time" type Accumulator interface { - Add(measurement string, value interface{}, - tags map[string]string, t ...time.Time) - AddFields(measurement string, fields map[string]interface{}, - tags map[string]string, t ...time.Time) + // Create a point with a value, decorating it with tags + // NOTE: tags is expected to be owned by the caller, don't mutate + // it after passing to Add. + Add(measurement string, + value interface{}, + tags map[string]string, + t ...time.Time) - SetDefaultTags(tags map[string]string) - AddDefaultTag(key, value string) - - Prefix() string - SetPrefix(prefix string) + AddFields(measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time) Debug() bool SetDebug(enabled bool) } - -func NewAccumulator( - inputConfig *models.InputConfig, - points chan *client.Point, -) Accumulator { - acc := accumulator{} - acc.points = points - acc.inputConfig = inputConfig - return &acc -} - -type accumulator struct { - sync.Mutex - - points chan *client.Point - - defaultTags map[string]string - - debug bool - - inputConfig *models.InputConfig - - prefix string -} - -func (ac *accumulator) Add( - measurement string, - value interface{}, - tags map[string]string, - t ...time.Time, -) { - fields := make(map[string]interface{}) - fields["value"] = value - ac.AddFields(measurement, fields, tags, t...) -} - -func (ac *accumulator) AddFields( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - if len(fields) == 0 || len(measurement) == 0 { - return - } - - if !ac.inputConfig.Filter.ShouldTagsPass(tags) { - return - } - - // Override measurement name if set - if len(ac.inputConfig.NameOverride) != 0 { - measurement = ac.inputConfig.NameOverride - } - // Apply measurement prefix and suffix if set - if len(ac.inputConfig.MeasurementPrefix) != 0 { - measurement = ac.inputConfig.MeasurementPrefix + measurement - } - if len(ac.inputConfig.MeasurementSuffix) != 0 { - measurement = measurement + ac.inputConfig.MeasurementSuffix - } - - if tags == nil { - tags = make(map[string]string) - } - // Apply plugin-wide tags if set - for k, v := range ac.inputConfig.Tags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - // Apply daemon-wide tags if set - for k, v := range ac.defaultTags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - - result := make(map[string]interface{}) - for k, v := range fields { - // Filter out any filtered fields - if ac.inputConfig != nil { - if !ac.inputConfig.Filter.ShouldPass(k) { - continue - } - } - result[k] = v - - // Validate uint64 and float64 fields - switch val := v.(type) { - case uint64: - // InfluxDB does not support writing uint64 - if val < uint64(9223372036854775808) { - result[k] = int64(val) - } else { - result[k] = int64(9223372036854775807) - } - case float64: - // NaNs are invalid values in influxdb, skip measurement - if math.IsNaN(val) || math.IsInf(val, 0) { - if ac.debug { - log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ - "field, skipping", - measurement, k) - } - continue - } - } - } - fields = nil - if len(result) == 0 { - return - } - - var timestamp time.Time - if len(t) > 0 { - timestamp = t[0] - } else { - timestamp = time.Now() - } - - if ac.prefix != "" { - measurement = ac.prefix + measurement - } - - pt, err := client.NewPoint(measurement, tags, result, timestamp) - if err != nil { - log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) - return - } - if ac.debug { - fmt.Println("> " + pt.String()) - } - ac.points <- pt -} - -func (ac *accumulator) SetDefaultTags(tags map[string]string) { - ac.defaultTags = tags -} - -func (ac *accumulator) AddDefaultTag(key, value string) { - ac.defaultTags[key] = value -} - -func (ac *accumulator) Prefix() string { - return ac.prefix -} - -func (ac *accumulator) SetPrefix(prefix string) { - ac.prefix = prefix -} - -func (ac *accumulator) Debug() bool { - return ac.debug -} - -func (ac *accumulator) SetDebug(debug bool) { - ac.debug = debug -} diff --git a/agent/accumulator.go b/agent/accumulator.go new file mode 100644 index 000000000..30d59e919 --- /dev/null +++ b/agent/accumulator.go @@ -0,0 +1,164 @@ +package agent + +import ( + "fmt" + "log" + "math" + "sync" + "time" + + "github.com/influxdata/telegraf/internal/models" + + "github.com/influxdata/influxdb/client/v2" +) + +func NewAccumulator( + inputConfig *internal_models.InputConfig, + points chan *client.Point, +) *accumulator { + acc := accumulator{} + acc.points = points + acc.inputConfig = inputConfig + return &acc +} + +type accumulator struct { + sync.Mutex + + points chan *client.Point + + defaultTags map[string]string + + debug bool + + inputConfig *internal_models.InputConfig + + prefix string +} + +func (ac *accumulator) Add( + measurement string, + value interface{}, + tags map[string]string, + t ...time.Time, +) { + fields := make(map[string]interface{}) + fields["value"] = value + ac.AddFields(measurement, fields, tags, t...) +} + +func (ac *accumulator) AddFields( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + if len(fields) == 0 || len(measurement) == 0 { + return + } + + if !ac.inputConfig.Filter.ShouldTagsPass(tags) { + return + } + + // Override measurement name if set + if len(ac.inputConfig.NameOverride) != 0 { + measurement = ac.inputConfig.NameOverride + } + // Apply measurement prefix and suffix if set + if len(ac.inputConfig.MeasurementPrefix) != 0 { + measurement = ac.inputConfig.MeasurementPrefix + measurement + } + if len(ac.inputConfig.MeasurementSuffix) != 0 { + measurement = measurement + ac.inputConfig.MeasurementSuffix + } + + if tags == nil { + tags = make(map[string]string) + } + // Apply plugin-wide tags if set + for k, v := range ac.inputConfig.Tags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + // Apply daemon-wide tags if set + for k, v := range ac.defaultTags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + + result := make(map[string]interface{}) + for k, v := range fields { + // Filter out any filtered fields + if ac.inputConfig != nil { + if !ac.inputConfig.Filter.ShouldPass(k) { + continue + } + } + result[k] = v + + // Validate uint64 and float64 fields + switch val := v.(type) { + case uint64: + // InfluxDB does not support writing uint64 + if val < uint64(9223372036854775808) { + result[k] = int64(val) + } else { + result[k] = int64(9223372036854775807) + } + case float64: + // NaNs are invalid values in influxdb, skip measurement + if math.IsNaN(val) || math.IsInf(val, 0) { + if ac.debug { + log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ + "field, skipping", + measurement, k) + } + continue + } + } + } + fields = nil + if len(result) == 0 { + return + } + + var timestamp time.Time + if len(t) > 0 { + timestamp = t[0] + } else { + timestamp = time.Now() + } + + if ac.prefix != "" { + measurement = ac.prefix + measurement + } + + pt, err := client.NewPoint(measurement, tags, result, timestamp) + if err != nil { + log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) + return + } + if ac.debug { + fmt.Println("> " + pt.String()) + } + ac.points <- pt +} + +func (ac *accumulator) Debug() bool { + return ac.debug +} + +func (ac *accumulator) SetDebug(debug bool) { + ac.debug = debug +} + +func (ac *accumulator) setDefaultTags(tags map[string]string) { + ac.defaultTags = tags +} + +func (ac *accumulator) addDefaultTag(key, value string) { + ac.defaultTags[key] = value +} diff --git a/agent.go b/agent/agent.go similarity index 94% rename from agent.go rename to agent/agent.go index ee5f45029..8825a2ec8 100644 --- a/agent.go +++ b/agent/agent.go @@ -1,4 +1,4 @@ -package telegraf +package agent import ( cryptorand "crypto/rand" @@ -11,10 +11,9 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/models" - "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/influxdb/client/v2" ) @@ -48,7 +47,7 @@ func NewAgent(config *config.Config) (*Agent, error) { func (a *Agent) Connect() error { for _, o := range a.Config.Outputs { switch ot := o.Output.(type) { - case outputs.ServiceOutput: + case telegraf.ServiceOutput: if err := ot.Start(); err != nil { log.Printf("Service for output %s failed to start, exiting\n%s\n", o.Name, err.Error()) @@ -81,14 +80,14 @@ func (a *Agent) Close() error { for _, o := range a.Config.Outputs { err = o.Output.Close() switch ot := o.Output.(type) { - case outputs.ServiceOutput: + case telegraf.ServiceOutput: ot.Stop() } } return err } -func panicRecover(input *models.RunningInput) { +func panicRecover(input *internal_models.RunningInput) { if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) @@ -115,13 +114,13 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { wg.Add(1) counter++ - go func(input *models.RunningInput) { + go func(input *internal_models.RunningInput) { defer panicRecover(input) defer wg.Done() acc := NewAccumulator(input.Config, pointChan) acc.SetDebug(a.Config.Agent.Debug) - acc.SetDefaultTags(a.Config.Tags) + acc.setDefaultTags(a.Config.Tags) if jitter != 0 { nanoSleep := rand.Int63n(jitter) @@ -159,7 +158,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { // reporting interval. func (a *Agent) gatherSeparate( shutdown chan struct{}, - input *models.RunningInput, + input *internal_models.RunningInput, pointChan chan *client.Point, ) error { defer panicRecover(input) @@ -172,7 +171,7 @@ func (a *Agent) gatherSeparate( acc := NewAccumulator(input.Config, pointChan) acc.SetDebug(a.Config.Agent.Debug) - acc.SetDefaultTags(a.Config.Tags) + acc.setDefaultTags(a.Config.Tags) if err := input.Input.Gather(acc); err != nil { log.Printf("Error in input [%s]: %s", input.Name, err) @@ -250,7 +249,7 @@ func (a *Agent) flush() { wg.Add(len(a.Config.Outputs)) for _, o := range a.Config.Outputs { - go func(output *models.RunningOutput) { + go func(output *internal_models.RunningOutput) { defer wg.Done() err := output.Write() if err != nil { @@ -344,7 +343,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { // Start service of any ServicePlugins switch p := input.Input.(type) { - case inputs.ServiceInput: + case telegraf.ServiceInput: if err := p.Start(); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", input.Name, err.Error()) @@ -357,7 +356,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { // configured. Default intervals are handled below with gatherParallel if input.Config.Interval != 0 { wg.Add(1) - go func(input *models.RunningInput) { + go func(input *internal_models.RunningInput) { defer wg.Done() if err := a.gatherSeparate(shutdown, input, pointChan); err != nil { log.Printf(err.Error()) diff --git a/agent_test.go b/agent/agent_test.go similarity index 82% rename from agent_test.go rename to agent/agent_test.go index 3420e665a..8bf8a150b 100644 --- a/agent_test.go +++ b/agent/agent_test.go @@ -1,4 +1,4 @@ -package telegraf +package agent import ( "github.com/stretchr/testify/assert" @@ -16,35 +16,35 @@ import ( func TestAgent_LoadPlugin(t *testing.T) { c := config.NewConfig() c.InputFilters = []string{"mysql"} - err := c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ := NewAgent(c) assert.Equal(t, 1, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"foo"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 0, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "foo"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 1, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "redis"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Inputs)) c = config.NewConfig() c.InputFilters = []string{"mysql", "foo", "redis", "bar"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Inputs)) @@ -53,42 +53,42 @@ func TestAgent_LoadPlugin(t *testing.T) { func TestAgent_LoadOutput(t *testing.T) { c := config.NewConfig() c.OutputFilters = []string{"influxdb"} - err := c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ := NewAgent(c) assert.Equal(t, 2, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"kafka"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 1, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"foo"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 0, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"influxdb", "foo"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 2, len(a.Config.Outputs)) c = config.NewConfig() c.OutputFilters = []string{"influxdb", "kafka"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) assert.Equal(t, 3, len(c.Outputs)) a, _ = NewAgent(c) @@ -96,7 +96,7 @@ func TestAgent_LoadOutput(t *testing.T) { c = config.NewConfig() c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"} - err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml") assert.NoError(t, err) a, _ = NewAgent(c) assert.Equal(t, 3, len(a.Config.Outputs)) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 72fb9fdcf..12b4073db 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -9,7 +9,7 @@ import ( "strings" "syscall" - "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" _ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all" @@ -173,7 +173,7 @@ func main() { log.Fatalf("Error: no inputs found, did you provide a valid config file?") } - ag, err := telegraf.NewAgent(c) + ag, err := agent.NewAgent(c) if err != nil { log.Fatal(err) } diff --git a/input.go b/input.go new file mode 100644 index 000000000..6992c1b43 --- /dev/null +++ b/input.go @@ -0,0 +1,31 @@ +package telegraf + +type Input interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Gather takes in an accumulator and adds the metrics that the Input + // gathers. This is called every "interval" + Gather(Accumulator) error +} + +type ServiceInput interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Gather takes in an accumulator and adds the metrics that the Input + // gathers. This is called every "interval" + Gather(Accumulator) error + + // Start starts the ServiceInput's service, whatever that may be + Start() error + + // Stop stops the services and closes any necessary channels and connections + Stop() +} diff --git a/internal/config/config.go b/internal/config/config.go index feb472ab1..9b35cd407 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" @@ -28,8 +29,8 @@ type Config struct { OutputFilters []string Agent *AgentConfig - Inputs []*models.RunningInput - Outputs []*models.RunningOutput + Inputs []*internal_models.RunningInput + Outputs []*internal_models.RunningOutput } func NewConfig() *Config { @@ -43,8 +44,8 @@ func NewConfig() *Config { }, Tags: make(map[string]string), - Inputs: make([]*models.RunningInput, 0), - Outputs: make([]*models.RunningOutput, 0), + Inputs: make([]*internal_models.RunningInput, 0), + Outputs: make([]*internal_models.RunningOutput, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -227,13 +228,13 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) { // Print Inputs fmt.Printf(pluginHeader) - servInputs := make(map[string]inputs.ServiceInput) + servInputs := make(map[string]telegraf.ServiceInput) for _, pname := range pnames { creator := inputs.Inputs[pname] input := creator() switch p := input.(type) { - case inputs.ServiceInput: + case telegraf.ServiceInput: servInputs[pname] = p continue } @@ -403,7 +404,7 @@ func (c *Config) addOutput(name string, table *ast.Table) error { return err } - ro := models.NewRunningOutput(name, output, outputConfig) + ro := internal_models.NewRunningOutput(name, output, outputConfig) if c.Agent.MetricBufferLimit > 0 { ro.PointBufferLimit = c.Agent.MetricBufferLimit } @@ -436,7 +437,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } - rp := &models.RunningInput{ + rp := &internal_models.RunningInput{ Name: name, Input: input, Config: pluginConfig, @@ -446,10 +447,10 @@ func (c *Config) addInput(name string, table *ast.Table) error { } // buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to -// be inserted into the models.OutputConfig/models.InputConfig to be used for prefix +// be inserted into the internal_models.OutputConfig/internal_models.InputConfig to be used for prefix // filtering on tags and measurements -func buildFilter(tbl *ast.Table) models.Filter { - f := models.Filter{} +func buildFilter(tbl *ast.Table) internal_models.Filter { + f := internal_models.Filter{} if node, ok := tbl.Fields["pass"]; ok { if kv, ok := node.(*ast.KeyValue); ok { @@ -481,7 +482,7 @@ func buildFilter(tbl *ast.Table) models.Filter { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { if kv, ok := val.(*ast.KeyValue); ok { - tagfilter := &models.TagFilter{Name: name} + tagfilter := &internal_models.TagFilter{Name: name} if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { @@ -500,7 +501,7 @@ func buildFilter(tbl *ast.Table) models.Filter { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { if kv, ok := val.(*ast.KeyValue); ok { - tagfilter := &models.TagFilter{Name: name} + tagfilter := &internal_models.TagFilter{Name: name} if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { @@ -524,9 +525,9 @@ func buildFilter(tbl *ast.Table) models.Filter { // buildInput parses input specific items from the ast.Table, // builds the filter and returns a -// models.InputConfig to be inserted into models.RunningInput -func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { - cp := &models.InputConfig{Name: name} +// internal_models.InputConfig to be inserted into internal_models.RunningInput +func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, error) { + cp := &internal_models.InputConfig{Name: name} if node, ok := tbl.Fields["interval"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { @@ -583,10 +584,10 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { } // buildOutput parses output specific items from the ast.Table, builds the filter and returns an -// models.OutputConfig to be inserted into models.RunningInput +// internal_models.OutputConfig to be inserted into internal_models.RunningInput // Note: error exists in the return for future calls that might require error -func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { - oc := &models.OutputConfig{ +func buildOutput(name string, tbl *ast.Table) (*internal_models.OutputConfig, error) { + oc := &internal_models.OutputConfig{ Name: name, Filter: buildFilter(tbl), } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 92f45ad0a..261057875 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -19,19 +19,19 @@ func TestConfig_LoadSingleInput(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &models.InputConfig{ + mConfig := &internal_models.InputConfig{ Name: "memcached", - Filter: models.Filter{ + Filter: internal_models.Filter{ Drop: []string{"other", "stuff"}, Pass: []string{"some", "strings"}, - TagDrop: []models.TagFilter{ - models.TagFilter{ + TagDrop: []internal_models.TagFilter{ + internal_models.TagFilter{ Name: "badtag", Filter: []string{"othertag"}, }, }, - TagPass: []models.TagFilter{ - models.TagFilter{ + TagPass: []internal_models.TagFilter{ + internal_models.TagFilter{ Name: "goodtag", Filter: []string{"mytag"}, }, @@ -62,19 +62,19 @@ func TestConfig_LoadDirectory(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &models.InputConfig{ + mConfig := &internal_models.InputConfig{ Name: "memcached", - Filter: models.Filter{ + Filter: internal_models.Filter{ Drop: []string{"other", "stuff"}, Pass: []string{"some", "strings"}, - TagDrop: []models.TagFilter{ - models.TagFilter{ + TagDrop: []internal_models.TagFilter{ + internal_models.TagFilter{ Name: "badtag", Filter: []string{"othertag"}, }, }, - TagPass: []models.TagFilter{ - models.TagFilter{ + TagPass: []internal_models.TagFilter{ + internal_models.TagFilter{ Name: "goodtag", Filter: []string{"mytag"}, }, @@ -92,7 +92,7 @@ func TestConfig_LoadDirectory(t *testing.T) { ex := inputs.Inputs["exec"]().(*exec.Exec) ex.Command = "/usr/bin/myothercollector --foo=bar" - eConfig := &models.InputConfig{ + eConfig := &internal_models.InputConfig{ Name: "exec", MeasurementSuffix: "_myothercollector", } @@ -111,7 +111,7 @@ func TestConfig_LoadDirectory(t *testing.T) { pstat := inputs.Inputs["procstat"]().(*procstat.Procstat) pstat.PidFile = "/var/run/grafana-server.pid" - pConfig := &models.InputConfig{Name: "procstat"} + pConfig := &internal_models.InputConfig{Name: "procstat"} pConfig.Tags = make(map[string]string) assert.Equal(t, pstat, c.Inputs[3].Input, diff --git a/internal/models/filter.go b/internal/models/filter.go index 3f171ccac..06fe636cb 100644 --- a/internal/models/filter.go +++ b/internal/models/filter.go @@ -1,4 +1,4 @@ -package models +package internal_models import ( "strings" diff --git a/internal/models/filter_test.go b/internal/models/filter_test.go index 9e962e420..320c38407 100644 --- a/internal/models/filter_test.go +++ b/internal/models/filter_test.go @@ -1,4 +1,4 @@ -package models +package internal_models import ( "testing" diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 17c0d2129..cffaf336c 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -1,14 +1,14 @@ -package models +package internal_models import ( "time" - "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf" ) type RunningInput struct { Name string - Input inputs.Input + Input telegraf.Input Config *InputConfig } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 196ebdc8a..f0dae6db1 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -1,10 +1,10 @@ -package models +package internal_models import ( "log" "time" - "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" "github.com/influxdata/influxdb/client/v2" ) @@ -13,7 +13,7 @@ const DEFAULT_POINT_BUFFER_LIMIT = 10000 type RunningOutput struct { Name string - Output outputs.Output + Output telegraf.Output Config *OutputConfig Quiet bool PointBufferLimit int @@ -24,7 +24,7 @@ type RunningOutput struct { func NewRunningOutput( name string, - output outputs.Output, + output telegraf.Output, conf *OutputConfig, ) *RunningOutput { ro := &RunningOutput{ diff --git a/metric.go b/metric.go new file mode 100644 index 000000000..fcbb1b291 --- /dev/null +++ b/metric.go @@ -0,0 +1,112 @@ +package telegraf + +import ( + "time" + + "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/influxdb/models" +) + +type Metric interface { + // Name returns the measurement name of the metric + Name() string + + // Name returns the tags associated with the metric + Tags() map[string]string + + // Time return the timestamp for the metric + Time() time.Time + + // UnixNano returns the unix nano time of the metric + UnixNano() int64 + + // Fields returns the fields for the metric + Fields() map[string]interface{} + + // String returns a line-protocol string of the metric + String() string + + // PrecisionString returns a line-protocol string of the metric, at precision + PrecisionString(precison string) string + + // Point returns a influxdb client.Point object + Point() *client.Point +} + +// metric is a wrapper of the influxdb client.Point struct +type metric struct { + pt *client.Point +} + +// NewMetric returns a metric with the given timestamp. If a timestamp is not +// given, then data is sent to the database without a timestamp, in which case +// the server will assign local time upon reception. NOTE: it is recommended to +// send data with a timestamp. +func NewMetric( + name string, + tags map[string]string, + fields map[string]interface{}, + t ...time.Time, +) (Metric, error) { + var T time.Time + if len(t) > 0 { + T = t[0] + } + + pt, err := client.NewPoint(name, tags, fields, T) + if err != nil { + return nil, err + } + return &metric{ + pt: pt, + }, nil +} + +// ParseMetrics returns a slice of Metrics from a text representation of a +// metric (in line-protocol format) +// with each metric separated by newlines. If any metrics fail to parse, +// a non-nil error will be returned in addition to the metrics that parsed +// successfully. +func ParseMetrics(buf []byte) ([]Metric, error) { + points, err := models.ParsePoints(buf) + metrics := make([]Metric, len(points)) + for i, point := range points { + // Ignore error here because it's impossible that a model.Point + // wouldn't parse into client.Point properly + metrics[i], _ = NewMetric(point.Name(), point.Tags(), + point.Fields(), point.Time()) + } + return metrics, err +} + +func (m *metric) Name() string { + return m.pt.Name() +} + +func (m *metric) Tags() map[string]string { + return m.pt.Tags() +} + +func (m *metric) Time() time.Time { + return m.pt.Time() +} + +func (m *metric) UnixNano() int64 { + return m.pt.UnixNano() +} + +func (m *metric) Fields() map[string]interface{} { + return m.pt.Fields() +} + +func (m *metric) String() string { + return m.pt.String() +} + +func (m *metric) PrecisionString(precison string) string { + return m.pt.PrecisionString(precison) +} + +func (m *metric) Point() *client.Point { + return m.pt +} diff --git a/output.go b/output.go new file mode 100644 index 000000000..39b1778b9 --- /dev/null +++ b/output.go @@ -0,0 +1,63 @@ +package telegraf + +import "github.com/influxdata/influxdb/client/v2" + +// type Output interface { +// // Connect to the Output +// Connect() error +// // Close any connections to the Output +// Close() error +// // Description returns a one-sentence description on the Output +// Description() string +// // SampleConfig returns the default configuration of the Output +// SampleConfig() string +// // Write takes in group of points to be written to the Output +// Write(metrics []Metric) error +// } + +// type ServiceOutput interface { +// // Connect to the Output +// Connect() error +// // Close any connections to the Output +// Close() error +// // Description returns a one-sentence description on the Output +// Description() string +// // SampleConfig returns the default configuration of the Output +// SampleConfig() string +// // Write takes in group of points to be written to the Output +// Write(metrics []Metric) error +// // Start the "service" that will provide an Output +// Start() error +// // Stop the "service" that will provide an Output +// Stop() +// } + +type Output interface { + // Connect to the Output + Connect() error + // Close any connections to the Output + Close() error + // Description returns a one-sentence description on the Output + Description() string + // SampleConfig returns the default configuration of the Output + SampleConfig() string + // Write takes in group of points to be written to the Output + Write(points []*client.Point) error +} + +type ServiceOutput interface { + // Connect to the Output + Connect() error + // Close any connections to the Output + Close() error + // Description returns a one-sentence description on the Output + Description() string + // SampleConfig returns the default configuration of the Output + SampleConfig() string + // Write takes in group of points to be written to the Output + Write(points []*client.Point) error + // Start the "service" that will provide an Output + Start() error + // Stop the "service" that will provide an Output + Stop() +} diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index aa015a4c0..00a396451 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "fmt" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "net" "strconv" @@ -119,7 +120,7 @@ func (a *Aerospike) Description() string { return "Read stats from an aerospike server" } -func (a *Aerospike) Gather(acc inputs.Accumulator) error { +func (a *Aerospike) Gather(acc telegraf.Accumulator) error { if len(a.Servers) == 0 { return a.gatherServer("127.0.0.1:3000", acc) } @@ -140,7 +141,7 @@ func (a *Aerospike) Gather(acc inputs.Accumulator) error { return outerr } -func (a *Aerospike) gatherServer(host string, acc inputs.Accumulator) error { +func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error { aerospikeInfo, err := getMap(STATISTICS_COMMAND, host) if err != nil { return fmt.Errorf("Aerospike info failed: %s", err) @@ -249,7 +250,7 @@ func get(key []byte, host string) (map[string]string, error) { func readAerospikeStats( stats map[string]string, - acc inputs.Accumulator, + acc telegraf.Accumulator, host string, namespace string, ) { @@ -336,7 +337,7 @@ func msgLenFromBytes(buf [6]byte) int64 { } func init() { - inputs.Add("aerospike", func() inputs.Input { + inputs.Add("aerospike", func() telegraf.Input { return &Aerospike{} }) } diff --git a/plugins/inputs/apache/apache.go b/plugins/inputs/apache/apache.go index 317a635d3..1256afd3f 100644 --- a/plugins/inputs/apache/apache.go +++ b/plugins/inputs/apache/apache.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -31,7 +32,7 @@ func (n *Apache) Description() string { return "Read Apache status information (mod_status)" } -func (n *Apache) Gather(acc inputs.Accumulator) error { +func (n *Apache) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup var outerr error @@ -59,7 +60,7 @@ var tr = &http.Transport{ var client = &http.Client{Transport: tr} -func (n *Apache) gatherUrl(addr *url.URL, acc inputs.Accumulator) error { +func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { resp, err := client.Get(addr.String()) if err != nil { return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err) @@ -164,7 +165,7 @@ func getTags(addr *url.URL) map[string]string { } func init() { - inputs.Add("apache", func() inputs.Input { + inputs.Add("apache", func() telegraf.Input { return &Apache{} }) } diff --git a/plugins/inputs/bcache/bcache.go b/plugins/inputs/bcache/bcache.go index b6d6eb130..9e0e19665 100644 --- a/plugins/inputs/bcache/bcache.go +++ b/plugins/inputs/bcache/bcache.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -69,7 +70,7 @@ func prettyToBytes(v string) uint64 { return uint64(result) } -func (b *Bcache) gatherBcache(bdev string, acc inputs.Accumulator) error { +func (b *Bcache) gatherBcache(bdev string, acc telegraf.Accumulator) error { tags := getTags(bdev) metrics, err := filepath.Glob(bdev + "/stats_total/*") if len(metrics) < 0 { @@ -104,7 +105,7 @@ func (b *Bcache) gatherBcache(bdev string, acc inputs.Accumulator) error { return nil } -func (b *Bcache) Gather(acc inputs.Accumulator) error { +func (b *Bcache) Gather(acc telegraf.Accumulator) error { bcacheDevsChecked := make(map[string]bool) var restrictDevs bool if len(b.BcacheDevs) != 0 { @@ -135,7 +136,7 @@ func (b *Bcache) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("bcache", func() inputs.Input { + inputs.Add("bcache", func() telegraf.Input { return &Bcache{} }) } diff --git a/plugins/inputs/disque/disque.go b/plugins/inputs/disque/disque.go index 364e78fbc..f1ca1b800 100644 --- a/plugins/inputs/disque/disque.go +++ b/plugins/inputs/disque/disque.go @@ -10,6 +10,7 @@ import ( "strings" "sync" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -61,7 +62,7 @@ var ErrProtocolError = errors.New("disque protocol error") // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). -func (g *Disque) Gather(acc inputs.Accumulator) error { +func (g *Disque) Gather(acc telegraf.Accumulator) error { if len(g.Servers) == 0 { url := &url.URL{ Host: ":7711", @@ -98,7 +99,7 @@ func (g *Disque) Gather(acc inputs.Accumulator) error { const defaultPort = "7711" -func (g *Disque) gatherServer(addr *url.URL, acc inputs.Accumulator) error { +func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { if g.c == nil { _, _, err := net.SplitHostPort(addr.Host) @@ -198,7 +199,7 @@ func (g *Disque) gatherServer(addr *url.URL, acc inputs.Accumulator) error { } func init() { - inputs.Add("disque", func() inputs.Input { + inputs.Add("disque", func() telegraf.Input { return &Disque{} }) } diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 0e96dd176..1b95d9dae 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/fsouza/go-dockerclient" @@ -33,7 +34,7 @@ func (d *Docker) Description() string { func (d *Docker) SampleConfig() string { return sampleConfig } -func (d *Docker) Gather(acc inputs.Accumulator) error { +func (d *Docker) Gather(acc telegraf.Accumulator) error { if d.client == nil { var c *docker.Client var err error @@ -80,7 +81,7 @@ func (d *Docker) Gather(acc inputs.Accumulator) error { func (d *Docker) gatherContainer( container docker.APIContainers, - acc inputs.Accumulator, + acc telegraf.Accumulator, ) error { // Parse container name cname := "unknown" @@ -129,7 +130,7 @@ func (d *Docker) gatherContainer( func gatherContainerStats( stat *docker.Stats, - acc inputs.Accumulator, + acc telegraf.Accumulator, tags map[string]string, ) { now := stat.Read @@ -212,7 +213,7 @@ func gatherContainerStats( func gatherBlockIOMetrics( stat *docker.Stats, - acc inputs.Accumulator, + acc telegraf.Accumulator, tags map[string]string, now time.Time, ) { @@ -303,7 +304,7 @@ func sliceContains(in string, sl []string) bool { } func init() { - inputs.Add("docker", func() inputs.Input { + inputs.Add("docker", func() telegraf.Input { return &Docker{} }) } diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 304e0e3d7..2dbd6f357 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -95,13 +96,13 @@ func (e *Elasticsearch) Description() string { // Gather reads the stats from Elasticsearch and writes it to the // Accumulator. -func (e *Elasticsearch) Gather(acc inputs.Accumulator) error { +func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { errChan := make(chan error, len(e.Servers)) var wg sync.WaitGroup wg.Add(len(e.Servers)) for _, serv := range e.Servers { - go func(s string, acc inputs.Accumulator) { + go func(s string, acc telegraf.Accumulator) { defer wg.Done() var url string if e.Local { @@ -133,7 +134,7 @@ func (e *Elasticsearch) Gather(acc inputs.Accumulator) error { return errors.New(strings.Join(errStrings, "\n")) } -func (e *Elasticsearch) gatherNodeStats(url string, acc inputs.Accumulator) error { +func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { nodeStats := &struct { ClusterName string `json:"cluster_name"` Nodes map[string]*node `json:"nodes"` @@ -178,7 +179,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc inputs.Accumulator) erro return nil } -func (e *Elasticsearch) gatherClusterStats(url string, acc inputs.Accumulator) error { +func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error { clusterStats := &clusterHealth{} if err := e.gatherData(url, clusterStats); err != nil { return err @@ -243,7 +244,7 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error { } func init() { - inputs.Add("elasticsearch", func() inputs.Input { + inputs.Add("elasticsearch", func() telegraf.Input { return NewElasticsearch() }) } diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 0fc0b098a..65be6bfaf 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -8,6 +8,7 @@ import ( "github.com/gonuts/go-shellquote" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -64,7 +65,7 @@ func (e *Exec) Description() string { return "Read flattened metrics from one or more commands that output JSON to stdout" } -func (e *Exec) Gather(acc inputs.Accumulator) error { +func (e *Exec) Gather(acc telegraf.Accumulator) error { out, err := e.runner.Run(e) if err != nil { return err @@ -88,7 +89,7 @@ func (e *Exec) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("exec", func() inputs.Input { + inputs.Add("exec", func() telegraf.Input { return NewExec() }) } diff --git a/plugins/inputs/github_webhooks/github_webhooks.go b/plugins/inputs/github_webhooks/github_webhooks.go index f764a5136..b4c0c5659 100644 --- a/plugins/inputs/github_webhooks/github_webhooks.go +++ b/plugins/inputs/github_webhooks/github_webhooks.go @@ -9,11 +9,12 @@ import ( "sync" "github.com/gorilla/mux" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) func init() { - inputs.Add("github_webhooks", func() inputs.Input { return &GithubWebhooks{} }) + inputs.Add("github_webhooks", func() telegraf.Input { return &GithubWebhooks{} }) } type GithubWebhooks struct { @@ -40,7 +41,7 @@ func (gh *GithubWebhooks) Description() string { } // Writes the points from <-gh.in to the Accumulator -func (gh *GithubWebhooks) Gather(acc inputs.Accumulator) error { +func (gh *GithubWebhooks) Gather(acc telegraf.Accumulator) error { gh.Lock() defer gh.Unlock() for _, event := range gh.events { diff --git a/plugins/inputs/haproxy/haproxy.go b/plugins/inputs/haproxy/haproxy.go index c2e334424..7e02756b8 100644 --- a/plugins/inputs/haproxy/haproxy.go +++ b/plugins/inputs/haproxy/haproxy.go @@ -3,6 +3,7 @@ package haproxy import ( "encoding/csv" "fmt" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "io" "net/http" @@ -104,7 +105,7 @@ func (r *haproxy) Description() string { // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). -func (g *haproxy) Gather(acc inputs.Accumulator) error { +func (g *haproxy) Gather(acc telegraf.Accumulator) error { if len(g.Servers) == 0 { return g.gatherServer("http://127.0.0.1:1936", acc) } @@ -126,7 +127,7 @@ func (g *haproxy) Gather(acc inputs.Accumulator) error { return outerr } -func (g *haproxy) gatherServer(addr string, acc inputs.Accumulator) error { +func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error { if g.client == nil { client := &http.Client{} @@ -156,7 +157,7 @@ func (g *haproxy) gatherServer(addr string, acc inputs.Accumulator) error { return importCsvResult(res.Body, acc, u.Host) } -func importCsvResult(r io.Reader, acc inputs.Accumulator, host string) error { +func importCsvResult(r io.Reader, acc telegraf.Accumulator, host string) error { csv := csv.NewReader(r) result, err := csv.ReadAll() now := time.Now() @@ -358,7 +359,7 @@ func importCsvResult(r io.Reader, acc inputs.Accumulator, host string) error { } func init() { - inputs.Add("haproxy", func() inputs.Input { + inputs.Add("haproxy", func() telegraf.Input { return &haproxy{} }) } diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index bc4fe7899..ee5e296d3 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -88,7 +89,7 @@ func (h *HttpJson) Description() string { } // Gathers data for all servers. -func (h *HttpJson) Gather(acc inputs.Accumulator) error { +func (h *HttpJson) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup errorChannel := make(chan error, len(h.Servers)) @@ -127,7 +128,7 @@ func (h *HttpJson) Gather(acc inputs.Accumulator) error { // Returns: // error: Any error that may have occurred func (h *HttpJson) gatherServer( - acc inputs.Accumulator, + acc telegraf.Accumulator, serverURL string, ) error { resp, responseTime, err := h.sendRequest(serverURL) @@ -232,7 +233,7 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) { } func init() { - inputs.Add("httpjson", func() inputs.Input { + inputs.Add("httpjson", func() telegraf.Input { return &HttpJson{client: RealHTTPClient{client: &http.Client{}}} }) } diff --git a/plugins/inputs/influxdb/influxdb.go b/plugins/inputs/influxdb/influxdb.go index 311f6ba0c..e8350ddca 100644 --- a/plugins/inputs/influxdb/influxdb.go +++ b/plugins/inputs/influxdb/influxdb.go @@ -8,6 +8,7 @@ import ( "strings" "sync" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -32,7 +33,7 @@ func (*InfluxDB) SampleConfig() string { ` } -func (i *InfluxDB) Gather(acc inputs.Accumulator) error { +func (i *InfluxDB) Gather(acc telegraf.Accumulator) error { errorChannel := make(chan error, len(i.URLs)) var wg sync.WaitGroup @@ -77,7 +78,7 @@ type point struct { // Returns: // error: Any error that may have occurred func (i *InfluxDB) gatherURL( - acc inputs.Accumulator, + acc telegraf.Accumulator, url string, ) error { resp, err := http.Get(url) @@ -140,7 +141,7 @@ func (i *InfluxDB) gatherURL( } func init() { - inputs.Add("influxdb", func() inputs.Input { + inputs.Add("influxdb", func() telegraf.Input { return &InfluxDB{} }) } diff --git a/plugins/inputs/jolokia/jolokia.go b/plugins/inputs/jolokia/jolokia.go index 7579ecb4a..fcf5d0bad 100644 --- a/plugins/inputs/jolokia/jolokia.go +++ b/plugins/inputs/jolokia/jolokia.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -108,7 +109,7 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { return jsonOut, nil } -func (j *Jolokia) Gather(acc inputs.Accumulator) error { +func (j *Jolokia) Gather(acc telegraf.Accumulator) error { context := j.Context //"/jolokia/read" servers := j.Servers metrics := j.Metrics @@ -157,7 +158,7 @@ func (j *Jolokia) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("jolokia", func() inputs.Input { + inputs.Add("jolokia", func() telegraf.Input { return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}} }) } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index a0f1d3d11..b25c32a4d 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/influxdata/influxdb/models" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/Shopify/sarama" @@ -148,7 +149,7 @@ func (k *Kafka) Stop() { } } -func (k *Kafka) Gather(acc inputs.Accumulator) error { +func (k *Kafka) Gather(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() npoints := len(k.pointChan) @@ -160,7 +161,7 @@ func (k *Kafka) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("kafka_consumer", func() inputs.Input { + inputs.Add("kafka_consumer", func() telegraf.Input { return &Kafka{} }) } diff --git a/plugins/inputs/leofs/leofs.go b/plugins/inputs/leofs/leofs.go index f4dd314b7..d186b328f 100644 --- a/plugins/inputs/leofs/leofs.go +++ b/plugins/inputs/leofs/leofs.go @@ -3,6 +3,7 @@ package leofs import ( "bufio" "fmt" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "net/url" "os/exec" @@ -146,7 +147,7 @@ func (l *LeoFS) Description() string { return "Read metrics from a LeoFS Server via SNMP" } -func (l *LeoFS) Gather(acc inputs.Accumulator) error { +func (l *LeoFS) Gather(acc telegraf.Accumulator) error { if len(l.Servers) == 0 { l.gatherServer(defaultEndpoint, ServerTypeManagerMaster, acc) return nil @@ -176,7 +177,7 @@ func (l *LeoFS) Gather(acc inputs.Accumulator) error { return outerr } -func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc inputs.Accumulator) error { +func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc telegraf.Accumulator) error { cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid) stdout, err := cmd.StdoutPipe() if err != nil { @@ -225,7 +226,7 @@ func retrieveTokenAfterColon(line string) (string, error) { } func init() { - inputs.Add("leofs", func() inputs.Input { + inputs.Add("leofs", func() telegraf.Input { return &LeoFS{} }) } diff --git a/plugins/inputs/lustre2/lustre2.go b/plugins/inputs/lustre2/lustre2.go index d6266de73..cf57c5c65 100644 --- a/plugins/inputs/lustre2/lustre2.go +++ b/plugins/inputs/lustre2/lustre2.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -129,7 +130,7 @@ var wanted_mds_fields = []*mapping{ }, } -func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc inputs.Accumulator) error { +func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc telegraf.Accumulator) error { files, err := filepath.Glob(fileglob) if err != nil { return err @@ -193,7 +194,7 @@ func (l *Lustre2) Description() string { } // Gather reads stats from all lustre targets -func (l *Lustre2) Gather(acc inputs.Accumulator) error { +func (l *Lustre2) Gather(acc telegraf.Accumulator) error { l.allFields = make(map[string]map[string]interface{}) if len(l.Ost_procfiles) == 0 { @@ -244,7 +245,7 @@ func (l *Lustre2) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("lustre2", func() inputs.Input { + inputs.Add("lustre2", func() telegraf.Input { return &Lustre2{} }) } diff --git a/plugins/inputs/mailchimp/mailchimp.go b/plugins/inputs/mailchimp/mailchimp.go index 284ac61e1..35ea38858 100644 --- a/plugins/inputs/mailchimp/mailchimp.go +++ b/plugins/inputs/mailchimp/mailchimp.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -34,7 +35,7 @@ func (m *MailChimp) Description() string { return "Gathers metrics from the /3.0/reports MailChimp API" } -func (m *MailChimp) Gather(acc inputs.Accumulator) error { +func (m *MailChimp) Gather(acc telegraf.Accumulator) error { if m.api == nil { m.api = NewChimpAPI(m.ApiKey) } @@ -71,7 +72,7 @@ func (m *MailChimp) Gather(acc inputs.Accumulator) error { return nil } -func gatherReport(acc inputs.Accumulator, report Report, now time.Time) { +func gatherReport(acc telegraf.Accumulator, report Report, now time.Time) { tags := make(map[string]string) tags["id"] = report.ID tags["campaign_title"] = report.CampaignTitle @@ -110,7 +111,7 @@ func gatherReport(acc inputs.Accumulator, report Report, now time.Time) { } func init() { - inputs.Add("mailchimp", func() inputs.Input { + inputs.Add("mailchimp", func() telegraf.Input { return &MailChimp{} }) } diff --git a/plugins/inputs/memcached/memcached.go b/plugins/inputs/memcached/memcached.go index 078f05aa3..d27e6a099 100644 --- a/plugins/inputs/memcached/memcached.go +++ b/plugins/inputs/memcached/memcached.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -69,7 +70,7 @@ func (m *Memcached) Description() string { } // Gather reads stats from all configured servers accumulates stats -func (m *Memcached) Gather(acc inputs.Accumulator) error { +func (m *Memcached) Gather(acc telegraf.Accumulator) error { if len(m.Servers) == 0 && len(m.UnixSockets) == 0 { return m.gatherServer(":11211", false, acc) } @@ -92,7 +93,7 @@ func (m *Memcached) Gather(acc inputs.Accumulator) error { func (m *Memcached) gatherServer( address string, unix bool, - acc inputs.Accumulator, + acc telegraf.Accumulator, ) error { var conn net.Conn if unix { @@ -178,7 +179,7 @@ func parseResponse(r *bufio.Reader) (map[string]string, error) { } func init() { - inputs.Add("memcached", func() inputs.Input { + inputs.Add("memcached", func() telegraf.Input { return &Memcached{} }) } diff --git a/plugins/inputs/mock_Plugin.go b/plugins/inputs/mock_Plugin.go index 87dd14884..caf30f72f 100644 --- a/plugins/inputs/mock_Plugin.go +++ b/plugins/inputs/mock_Plugin.go @@ -1,12 +1,16 @@ package inputs -import "github.com/stretchr/testify/mock" +import ( + "github.com/influxdata/telegraf" + + "github.com/stretchr/testify/mock" +) type MockPlugin struct { mock.Mock } -func (m *MockPlugin) Gather(_a0 Accumulator) error { +func (m *MockPlugin) Gather(_a0 telegraf.Accumulator) error { ret := m.Called(_a0) r0 := ret.Error(0) diff --git a/plugins/inputs/mongodb/mongodb.go b/plugins/inputs/mongodb/mongodb.go index ce73c3a14..b0cf492c0 100644 --- a/plugins/inputs/mongodb/mongodb.go +++ b/plugins/inputs/mongodb/mongodb.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "gopkg.in/mgo.v2" ) @@ -45,7 +46,7 @@ var localhost = &url.URL{Host: "127.0.0.1:27017"} // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). -func (m *MongoDB) Gather(acc inputs.Accumulator) error { +func (m *MongoDB) Gather(acc telegraf.Accumulator) error { if len(m.Servers) == 0 { m.gatherServer(m.getMongoServer(localhost), acc) return nil @@ -88,7 +89,7 @@ func (m *MongoDB) getMongoServer(url *url.URL) *Server { return m.mongos[url.Host] } -func (m *MongoDB) gatherServer(server *Server, acc inputs.Accumulator) error { +func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error { if server.Session == nil { var dialAddrs []string if server.Url.User != nil { @@ -138,7 +139,7 @@ func (m *MongoDB) gatherServer(server *Server, acc inputs.Accumulator) error { } func init() { - inputs.Add("mongodb", func() inputs.Input { + inputs.Add("mongodb", func() telegraf.Input { return &MongoDB{ mongos: make(map[string]*Server), } diff --git a/plugins/inputs/mongodb/mongodb_data.go b/plugins/inputs/mongodb/mongodb_data.go index c0c68c330..1a951806d 100644 --- a/plugins/inputs/mongodb/mongodb_data.go +++ b/plugins/inputs/mongodb/mongodb_data.go @@ -5,7 +5,7 @@ import ( "reflect" "strconv" - "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf" ) type MongodbData struct { @@ -97,7 +97,7 @@ func (d *MongodbData) add(key string, val interface{}) { d.Fields[key] = val } -func (d *MongodbData) flush(acc inputs.Accumulator) { +func (d *MongodbData) flush(acc telegraf.Accumulator) { acc.AddFields( "mongodb", d.Fields, diff --git a/plugins/inputs/mongodb/mongodb_server.go b/plugins/inputs/mongodb/mongodb_server.go index 87552f906..26aac2b63 100644 --- a/plugins/inputs/mongodb/mongodb_server.go +++ b/plugins/inputs/mongodb/mongodb_server.go @@ -4,7 +4,7 @@ import ( "net/url" "time" - "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" ) @@ -21,7 +21,7 @@ func (s *Server) getDefaultTags() map[string]string { return tags } -func (s *Server) gatherData(acc inputs.Accumulator) error { +func (s *Server) gatherData(acc telegraf.Accumulator) error { s.Session.SetMode(mgo.Eventual, true) s.Session.SetSocketTimeout(0) result := &ServerStatus{} diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 7434a282a..07a739d01 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -6,6 +6,7 @@ import ( "strings" _ "github.com/go-sql-driver/mysql" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -35,7 +36,7 @@ func (m *Mysql) Description() string { var localhost = "" -func (m *Mysql) Gather(acc inputs.Accumulator) error { +func (m *Mysql) Gather(acc telegraf.Accumulator) error { if len(m.Servers) == 0 { // if we can't get stats in this case, thats fine, don't report // an error. @@ -113,7 +114,7 @@ var mappings = []*mapping{ }, } -func (m *Mysql) gatherServer(serv string, acc inputs.Accumulator) error { +func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error { // If user forgot the '/', add it if strings.HasSuffix(serv, ")") { serv = serv + "/" @@ -207,7 +208,7 @@ func (m *Mysql) gatherServer(serv string, acc inputs.Accumulator) error { } func init() { - inputs.Add("mysql", func() inputs.Input { + inputs.Add("mysql", func() telegraf.Input { return &Mysql{} }) } diff --git a/plugins/inputs/nginx/nginx.go b/plugins/inputs/nginx/nginx.go index 6ea665b7e..126c48f54 100644 --- a/plugins/inputs/nginx/nginx.go +++ b/plugins/inputs/nginx/nginx.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -31,7 +32,7 @@ func (n *Nginx) Description() string { return "Read Nginx's basic status information (ngx_http_stub_status_module)" } -func (n *Nginx) Gather(acc inputs.Accumulator) error { +func (n *Nginx) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup var outerr error @@ -59,7 +60,7 @@ var tr = &http.Transport{ var client = &http.Client{Transport: tr} -func (n *Nginx) gatherUrl(addr *url.URL, acc inputs.Accumulator) error { +func (n *Nginx) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { resp, err := client.Get(addr.String()) if err != nil { return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err) @@ -159,7 +160,7 @@ func getTags(addr *url.URL) map[string]string { } func init() { - inputs.Add("nginx", func() inputs.Input { + inputs.Add("nginx", func() telegraf.Input { return &Nginx{} }) } diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index 9b680a0db..82c09a4bb 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -31,6 +31,7 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -49,7 +50,7 @@ const ( ) func init() { - inputs.Add("nsq", func() inputs.Input { + inputs.Add("nsq", func() telegraf.Input { return &NSQ{} }) } @@ -62,7 +63,7 @@ func (n *NSQ) Description() string { return "Read NSQ topic and channel statistics." } -func (n *NSQ) Gather(acc inputs.Accumulator) error { +func (n *NSQ) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup var outerr error @@ -85,7 +86,7 @@ var tr = &http.Transport{ var client = &http.Client{Transport: tr} -func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error { +func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error { u, err := buildURL(e) if err != nil { return err @@ -136,7 +137,7 @@ func buildURL(e string) (*url.URL, error) { return addr, nil } -func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) { +func topicStats(t TopicStats, acc telegraf.Accumulator, host, version string) { // per topic overall (tag: name, paused, channel count) tags := map[string]string{ "server_host": host, @@ -157,7 +158,7 @@ func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) { } } -func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) { +func channelStats(c ChannelStats, acc telegraf.Accumulator, host, version, topic string) { tags := map[string]string{ "server_host": host, "server_version": version, @@ -182,7 +183,7 @@ func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic s } } -func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) { +func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic, channel string) { tags := map[string]string{ "server_host": host, "server_version": version, diff --git a/plugins/inputs/passenger/passenger.go b/plugins/inputs/passenger/passenger.go index c5b049b7c..a91e8503c 100644 --- a/plugins/inputs/passenger/passenger.go +++ b/plugins/inputs/passenger/passenger.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "golang.org/x/net/html/charset" ) @@ -145,7 +146,7 @@ func (r *passenger) Description() string { return "Read metrics of passenger using passenger-status" } -func (g *passenger) Gather(acc inputs.Accumulator) error { +func (g *passenger) Gather(acc telegraf.Accumulator) error { if g.Command == "" { g.Command = "passenger-status -v --show=xml" } @@ -164,7 +165,7 @@ func (g *passenger) Gather(acc inputs.Accumulator) error { return nil } -func importMetric(stat []byte, acc inputs.Accumulator) error { +func importMetric(stat []byte, acc telegraf.Accumulator) error { var p info decoder := xml.NewDecoder(bytes.NewReader(stat)) @@ -244,7 +245,7 @@ func importMetric(stat []byte, acc inputs.Accumulator) error { } func init() { - inputs.Add("passenger", func() inputs.Input { + inputs.Add("passenger", func() telegraf.Input { return &passenger{} }) } diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index 0166f7bea..2e9bc417a 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -12,6 +12,7 @@ import ( "strings" "sync" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -73,7 +74,7 @@ func (r *phpfpm) Description() string { // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). -func (g *phpfpm) Gather(acc inputs.Accumulator) error { +func (g *phpfpm) Gather(acc telegraf.Accumulator) error { if len(g.Urls) == 0 { return g.gatherServer("http://127.0.0.1/status", acc) } @@ -96,7 +97,7 @@ func (g *phpfpm) Gather(acc inputs.Accumulator) error { } // Request status page to get stat raw data and import it -func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error { +func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error { if g.client == nil { client := &http.Client{} g.client = client @@ -140,7 +141,7 @@ func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error { } // Gather stat using fcgi protocol -func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc inputs.Accumulator) error { +func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumulator) error { fpmOutput, fpmErr, err := fcgi.Request(map[string]string{ "SCRIPT_NAME": "/" + statusPath, "SCRIPT_FILENAME": statusPath, @@ -160,7 +161,7 @@ func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc inputs.Accumulato } // Gather stat using http protocol -func (g *phpfpm) gatherHttp(addr string, acc inputs.Accumulator) error { +func (g *phpfpm) gatherHttp(addr string, acc telegraf.Accumulator) error { u, err := url.Parse(addr) if err != nil { return fmt.Errorf("Unable parse server address '%s': %s", addr, err) @@ -184,7 +185,7 @@ func (g *phpfpm) gatherHttp(addr string, acc inputs.Accumulator) error { } // Import stat data into Telegraf system -func importMetric(r io.Reader, acc inputs.Accumulator) (poolStat, error) { +func importMetric(r io.Reader, acc telegraf.Accumulator) (poolStat, error) { stats := make(poolStat) var currentPool string @@ -239,7 +240,7 @@ func importMetric(r io.Reader, acc inputs.Accumulator) (poolStat, error) { } func init() { - inputs.Add("phpfpm", func() inputs.Input { + inputs.Add("phpfpm", func() telegraf.Input { return &phpfpm{} }) } diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index aa1d5bf36..9c8c9bba8 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -7,6 +7,7 @@ import ( "strings" "sync" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -56,7 +57,7 @@ func (_ *Ping) SampleConfig() string { return sampleConfig } -func (p *Ping) Gather(acc inputs.Accumulator) error { +func (p *Ping) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup errorChannel := make(chan error, len(p.Urls)*2) @@ -64,7 +65,7 @@ func (p *Ping) Gather(acc inputs.Accumulator) error { // Spin off a go routine for each url to ping for _, url := range p.Urls { wg.Add(1) - go func(url string, acc inputs.Accumulator) { + go func(url string, acc telegraf.Accumulator) { defer wg.Done() args := p.args(url) out, err := p.pingHost(args...) @@ -176,7 +177,7 @@ func processPingOutput(out string) (int, int, float64, error) { } func init() { - inputs.Add("ping", func() inputs.Input { + inputs.Add("ping", func() telegraf.Input { return &Ping{pingHost: hostPinger} }) } diff --git a/plugins/inputs/postgresql/postgresql.go b/plugins/inputs/postgresql/postgresql.go index 3398f5ac0..3a9815af6 100644 --- a/plugins/inputs/postgresql/postgresql.go +++ b/plugins/inputs/postgresql/postgresql.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/lib/pq" @@ -53,7 +54,7 @@ func (p *Postgresql) IgnoredColumns() map[string]bool { var localhost = "host=localhost sslmode=disable" -func (p *Postgresql) Gather(acc inputs.Accumulator) error { +func (p *Postgresql) Gather(acc telegraf.Accumulator) error { var query string if p.Address == "" || p.Address == "localhost" { @@ -101,7 +102,7 @@ type scanner interface { Scan(dest ...interface{}) error } -func (p *Postgresql) accRow(row scanner, acc inputs.Accumulator) error { +func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error { var columnVars []interface{} var dbname bytes.Buffer @@ -145,7 +146,7 @@ func (p *Postgresql) accRow(row scanner, acc inputs.Accumulator) error { } func init() { - inputs.Add("postgresql", func() inputs.Input { + inputs.Add("postgresql", func() telegraf.Input { return &Postgresql{} }) } diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index fd8158ec7..eaaea2843 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -10,6 +10,7 @@ import ( "github.com/shirou/gopsutil/process" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -49,7 +50,7 @@ func (_ *Procstat) Description() string { return "Monitor process cpu and memory usage" } -func (p *Procstat) Gather(acc inputs.Accumulator) error { +func (p *Procstat) Gather(acc telegraf.Accumulator) error { err := p.createProcesses() if err != nil { log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] %s", @@ -175,7 +176,7 @@ func pidsFromPattern(pattern string) ([]int32, error) { } func init() { - inputs.Add("procstat", func() inputs.Input { + inputs.Add("procstat", func() telegraf.Input { return NewProcstat() }) } diff --git a/plugins/inputs/procstat/spec_processor.go b/plugins/inputs/procstat/spec_processor.go index b66572f2e..b09ed4f21 100644 --- a/plugins/inputs/procstat/spec_processor.go +++ b/plugins/inputs/procstat/spec_processor.go @@ -6,14 +6,14 @@ import ( "github.com/shirou/gopsutil/process" - "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf" ) type SpecProcessor struct { Prefix string tags map[string]string fields map[string]interface{} - acc inputs.Accumulator + acc telegraf.Accumulator proc *process.Process } @@ -34,7 +34,7 @@ func (p *SpecProcessor) flush() { func NewSpecProcessor( prefix string, - acc inputs.Accumulator, + acc telegraf.Accumulator, p *process.Process, ) *SpecProcessor { tags := make(map[string]string) diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index e6374b8d6..9db0dd165 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -3,6 +3,7 @@ package prometheus import ( "errors" "fmt" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" @@ -32,7 +33,7 @@ var ErrProtocolError = errors.New("prometheus protocol error") // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). -func (g *Prometheus) Gather(acc inputs.Accumulator) error { +func (g *Prometheus) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup var outerr error @@ -50,7 +51,7 @@ func (g *Prometheus) Gather(acc inputs.Accumulator) error { return outerr } -func (g *Prometheus) gatherURL(url string, acc inputs.Accumulator) error { +func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { resp, err := http.Get(url) if err != nil { return fmt.Errorf("error making HTTP request to %s: %s", url, err) @@ -97,7 +98,7 @@ func (g *Prometheus) gatherURL(url string, acc inputs.Accumulator) error { } func init() { - inputs.Add("prometheus", func() inputs.Input { + inputs.Add("prometheus", func() telegraf.Input { return &Prometheus{} }) } diff --git a/plugins/inputs/puppetagent/puppetagent.go b/plugins/inputs/puppetagent/puppetagent.go index eee9186b3..693d0aaff 100644 --- a/plugins/inputs/puppetagent/puppetagent.go +++ b/plugins/inputs/puppetagent/puppetagent.go @@ -8,6 +8,7 @@ import ( "reflect" "strings" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -82,7 +83,7 @@ func (pa *PuppetAgent) Description() string { } // Gather reads stats from all configured servers accumulates stats -func (pa *PuppetAgent) Gather(acc inputs.Accumulator) error { +func (pa *PuppetAgent) Gather(acc telegraf.Accumulator) error { if len(pa.Location) == 0 { pa.Location = "/var/lib/puppet/state/last_run_summary.yaml" @@ -110,7 +111,7 @@ func (pa *PuppetAgent) Gather(acc inputs.Accumulator) error { return nil } -func structPrinter(s *State, acc inputs.Accumulator, tags map[string]string) { +func structPrinter(s *State, acc telegraf.Accumulator, tags map[string]string) { e := reflect.ValueOf(s).Elem() fields := make(map[string]interface{}) @@ -131,7 +132,7 @@ func structPrinter(s *State, acc inputs.Accumulator, tags map[string]string) { } func init() { - inputs.Add("puppetagent", func() inputs.Input { + inputs.Add("puppetagent", func() telegraf.Input { return &PuppetAgent{} }) } diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 103484e78..87edc1ee9 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -96,7 +97,7 @@ type Node struct { SocketsUsed int64 `json:"sockets_used"` } -type gatherFunc func(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) +type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} @@ -119,7 +120,7 @@ func (r *RabbitMQ) Description() string { return "Read metrics from one or many RabbitMQ servers via the management API" } -func (r *RabbitMQ) Gather(acc inputs.Accumulator) error { +func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { if r.Client == nil { r.Client = &http.Client{} } @@ -172,7 +173,7 @@ func (r *RabbitMQ) requestJSON(u string, target interface{}) error { return nil } -func gatherOverview(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) { +func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { overview := &OverviewResponse{} err := r.requestJSON("/api/overview", &overview) @@ -208,7 +209,7 @@ func gatherOverview(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) { errChan <- nil } -func gatherNodes(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) { +func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { nodes := make([]Node, 0) // Gather information about nodes err := r.requestJSON("/api/nodes", &nodes) @@ -245,7 +246,7 @@ func gatherNodes(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) { errChan <- nil } -func gatherQueues(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) { +func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { // Gather information about queues queues := make([]Queue, 0) err := r.requestJSON("/api/queues", &queues) @@ -330,7 +331,7 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool { } func init() { - inputs.Add("rabbitmq", func() inputs.Input { + inputs.Add("rabbitmq", func() telegraf.Input { return &RabbitMQ{} }) } diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index 735aa2052..221ed3c15 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -10,6 +10,7 @@ import ( "strings" "sync" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -76,7 +77,7 @@ var ErrProtocolError = errors.New("redis protocol error") // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). -func (r *Redis) Gather(acc inputs.Accumulator) error { +func (r *Redis) Gather(acc telegraf.Accumulator) error { if len(r.Servers) == 0 { url := &url.URL{ Host: ":6379", @@ -113,7 +114,7 @@ func (r *Redis) Gather(acc inputs.Accumulator) error { const defaultPort = "6379" -func (r *Redis) gatherServer(addr *url.URL, acc inputs.Accumulator) error { +func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { _, _, err := net.SplitHostPort(addr.Host) if err != nil { addr.Host = addr.Host + ":" + defaultPort @@ -158,7 +159,7 @@ func (r *Redis) gatherServer(addr *url.URL, acc inputs.Accumulator) error { // gatherInfoOutput gathers func gatherInfoOutput( rdr *bufio.Reader, - acc inputs.Accumulator, + acc telegraf.Accumulator, tags map[string]string, ) error { var keyspace_hits, keyspace_misses uint64 = 0, 0 @@ -227,7 +228,7 @@ func gatherInfoOutput( func gatherKeyspaceLine( name string, line string, - acc inputs.Accumulator, + acc telegraf.Accumulator, tags map[string]string, ) { if strings.Contains(line, "keys=") { @@ -246,7 +247,7 @@ func gatherKeyspaceLine( } func init() { - inputs.Add("redis", func() inputs.Input { + inputs.Add("redis", func() telegraf.Input { return &Redis{} }) } diff --git a/plugins/inputs/registry.go b/plugins/inputs/registry.go index 2b99078f0..9951cd5cd 100644 --- a/plugins/inputs/registry.go +++ b/plugins/inputs/registry.go @@ -1,53 +1,8 @@ package inputs -import "time" +import "github.com/influxdata/telegraf" -type Accumulator interface { - // Create a point with a value, decorating it with tags - // NOTE: tags is expected to be owned by the caller, don't mutate - // it after passing to Add. - Add(measurement string, - value interface{}, - tags map[string]string, - t ...time.Time) - - AddFields(measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time) -} - -type Input interface { - // SampleConfig returns the default configuration of the Input - SampleConfig() string - - // Description returns a one-sentence description on the Input - Description() string - - // Gather takes in an accumulator and adds the metrics that the Input - // gathers. This is called every "interval" - Gather(Accumulator) error -} - -type ServiceInput interface { - // SampleConfig returns the default configuration of the Input - SampleConfig() string - - // Description returns a one-sentence description on the Input - Description() string - - // Gather takes in an accumulator and adds the metrics that the Input - // gathers. This is called every "interval" - Gather(Accumulator) error - - // Start starts the ServiceInput's service, whatever that may be - Start() error - - // Stop stops the services and closes any necessary channels and connections - Stop() -} - -type Creator func() Input +type Creator func() telegraf.Input var Inputs = map[string]Creator{} diff --git a/plugins/inputs/rethinkdb/rethinkdb.go b/plugins/inputs/rethinkdb/rethinkdb.go index 1f28dab25..4d72fed55 100644 --- a/plugins/inputs/rethinkdb/rethinkdb.go +++ b/plugins/inputs/rethinkdb/rethinkdb.go @@ -5,6 +5,7 @@ import ( "net/url" "sync" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "gopkg.in/dancannon/gorethink.v1" @@ -35,7 +36,7 @@ var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}} // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). -func (r *RethinkDB) Gather(acc inputs.Accumulator) error { +func (r *RethinkDB) Gather(acc telegraf.Accumulator) error { if len(r.Servers) == 0 { r.gatherServer(localhost, acc) return nil @@ -65,7 +66,7 @@ func (r *RethinkDB) Gather(acc inputs.Accumulator) error { return outerr } -func (r *RethinkDB) gatherServer(server *Server, acc inputs.Accumulator) error { +func (r *RethinkDB) gatherServer(server *Server, acc telegraf.Accumulator) error { var err error connectOpts := gorethink.ConnectOpts{ Address: server.Url.Host, @@ -87,7 +88,7 @@ func (r *RethinkDB) gatherServer(server *Server, acc inputs.Accumulator) error { } func init() { - inputs.Add("rethinkdb", func() inputs.Input { + inputs.Add("rethinkdb", func() telegraf.Input { return &RethinkDB{} }) } diff --git a/plugins/inputs/rethinkdb/rethinkdb_data.go b/plugins/inputs/rethinkdb/rethinkdb_data.go index 8093fa5ba..ca4ac7552 100644 --- a/plugins/inputs/rethinkdb/rethinkdb_data.go +++ b/plugins/inputs/rethinkdb/rethinkdb_data.go @@ -4,7 +4,7 @@ import ( "reflect" "time" - "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf" ) type serverStatus struct { @@ -88,7 +88,7 @@ var engineStats = map[string]string{ func (e *Engine) AddEngineStats( keys []string, - acc inputs.Accumulator, + acc telegraf.Accumulator, tags map[string]string, ) { engine := reflect.ValueOf(e).Elem() @@ -99,7 +99,7 @@ func (e *Engine) AddEngineStats( acc.AddFields("rethinkdb_engine", fields, tags) } -func (s *Storage) AddStats(acc inputs.Accumulator, tags map[string]string) { +func (s *Storage) AddStats(acc telegraf.Accumulator, tags map[string]string) { fields := map[string]interface{}{ "cache_bytes_in_use": s.Cache.BytesInUse, "disk_read_bytes_per_sec": s.Disk.ReadBytesPerSec, diff --git a/plugins/inputs/rethinkdb/rethinkdb_server.go b/plugins/inputs/rethinkdb/rethinkdb_server.go index 6ca7a3af1..98e2a35f0 100644 --- a/plugins/inputs/rethinkdb/rethinkdb_server.go +++ b/plugins/inputs/rethinkdb/rethinkdb_server.go @@ -9,7 +9,7 @@ import ( "strconv" "strings" - "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf" "gopkg.in/dancannon/gorethink.v1" ) @@ -20,7 +20,7 @@ type Server struct { serverStatus serverStatus } -func (s *Server) gatherData(acc inputs.Accumulator) error { +func (s *Server) gatherData(acc telegraf.Accumulator) error { if err := s.getServerStatus(); err != nil { return fmt.Errorf("Failed to get server_status, %s\n", err) } @@ -110,7 +110,7 @@ var ClusterTracking = []string{ "written_docs_per_sec", } -func (s *Server) addClusterStats(acc inputs.Accumulator) error { +func (s *Server) addClusterStats(acc telegraf.Accumulator) error { cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session) if err != nil { return fmt.Errorf("cluster stats query error, %s\n", err.Error()) @@ -138,7 +138,7 @@ var MemberTracking = []string{ "total_writes", } -func (s *Server) addMemberStats(acc inputs.Accumulator) error { +func (s *Server) addMemberStats(acc telegraf.Accumulator) error { cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.Id}).Run(s.session) if err != nil { return fmt.Errorf("member stats query error, %s\n", err.Error()) @@ -162,7 +162,7 @@ var TableTracking = []string{ "total_writes", } -func (s *Server) addTableStats(acc inputs.Accumulator) error { +func (s *Server) addTableStats(acc telegraf.Accumulator) error { tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session) defer tablesCursor.Close() var tables []tableStatus diff --git a/plugins/inputs/sensors/sensors.go b/plugins/inputs/sensors/sensors.go index 81001abd8..7cfd02a8a 100644 --- a/plugins/inputs/sensors/sensors.go +++ b/plugins/inputs/sensors/sensors.go @@ -7,6 +7,7 @@ import ( "github.com/md14454/gosensors" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -35,7 +36,7 @@ func (_ *Sensors) SampleConfig() string { return sensorsSampleConfig } -func (s *Sensors) Gather(acc inputs.Accumulator) error { +func (s *Sensors) Gather(acc telegraf.Accumulator) error { gosensors.Init() defer gosensors.Cleanup() @@ -84,7 +85,7 @@ func (s *Sensors) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("sensors", func() inputs.Input { + inputs.Add("sensors", func() telegraf.Input { return &Sensors{} }) } diff --git a/plugins/inputs/snmp/snmp.go b/plugins/inputs/snmp/snmp.go index bebb54bdc..50b78fc9b 100644 --- a/plugins/inputs/snmp/snmp.go +++ b/plugins/inputs/snmp/snmp.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/soniah/gosnmp" @@ -187,7 +188,7 @@ func findnodename(node Node, ids []string) (string, string) { return node.name, "" } -func (s *Snmp) Gather(acc inputs.Accumulator) error { +func (s *Snmp) Gather(acc telegraf.Accumulator) error { // Create oid tree if s.SnmptranslateFile != "" && len(initNode.subnodes) == 0 { data, err := ioutil.ReadFile(s.SnmptranslateFile) @@ -283,7 +284,7 @@ func (s *Snmp) Gather(acc inputs.Accumulator) error { return nil } -func (h *Host) SNMPGet(acc inputs.Accumulator) error { +func (h *Host) SNMPGet(acc telegraf.Accumulator) error { // Get snmp client snmpClient, err := h.GetSNMPClient() if err != nil { @@ -324,7 +325,7 @@ func (h *Host) SNMPGet(acc inputs.Accumulator) error { return nil } -func (h *Host) SNMPBulk(acc inputs.Accumulator) error { +func (h *Host) SNMPBulk(acc telegraf.Accumulator) error { // Get snmp client snmpClient, err := h.GetSNMPClient() if err != nil { @@ -411,7 +412,7 @@ func (h *Host) GetSNMPClient() (*gosnmp.GoSNMP, error) { return snmpClient, nil } -func (h *Host) HandleResponse(oids map[string]Data, result *gosnmp.SnmpPacket, acc inputs.Accumulator) (string, error) { +func (h *Host) HandleResponse(oids map[string]Data, result *gosnmp.SnmpPacket, acc telegraf.Accumulator) (string, error) { var lastOid string for _, variable := range result.Variables { lastOid = variable.Name @@ -467,7 +468,7 @@ func (h *Host) HandleResponse(oids map[string]Data, result *gosnmp.SnmpPacket, a } func init() { - inputs.Add("snmp", func() inputs.Input { + inputs.Add("snmp", func() telegraf.Input { return &Snmp{} }) } diff --git a/plugins/inputs/sqlserver/sqlserver.go b/plugins/inputs/sqlserver/sqlserver.go index 4c6214822..3a67f065d 100644 --- a/plugins/inputs/sqlserver/sqlserver.go +++ b/plugins/inputs/sqlserver/sqlserver.go @@ -2,6 +2,7 @@ package sqlserver import ( "database/sql" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "sync" "time" @@ -70,7 +71,7 @@ func initQueries() { } // Gather collect data from SQL Server -func (s *SQLServer) Gather(acc inputs.Accumulator) error { +func (s *SQLServer) Gather(acc telegraf.Accumulator) error { initQueries() if len(s.Servers) == 0 { @@ -94,7 +95,7 @@ func (s *SQLServer) Gather(acc inputs.Accumulator) error { return outerr } -func (s *SQLServer) gatherServer(server string, query Query, acc inputs.Accumulator) error { +func (s *SQLServer) gatherServer(server string, query Query, acc telegraf.Accumulator) error { // deferred opening conn, err := sql.Open("mssql", server) if err != nil { @@ -130,7 +131,7 @@ func (s *SQLServer) gatherServer(server string, query Query, acc inputs.Accumula return rows.Err() } -func (s *SQLServer) accRow(query Query, acc inputs.Accumulator, row scanner) error { +func (s *SQLServer) accRow(query Query, acc telegraf.Accumulator, row scanner) error { var columnVars []interface{} var fields = make(map[string]interface{}) @@ -180,7 +181,7 @@ func (s *SQLServer) accRow(query Query, acc inputs.Accumulator, row scanner) err } func init() { - inputs.Add("sqlserver", func() inputs.Input { + inputs.Add("sqlserver", func() telegraf.Input { return &SQLServer{} }) } diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 6b7a427b7..7bccd846e 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/services/graphite" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -156,7 +157,7 @@ func (_ *Statsd) SampleConfig() string { return sampleConfig } -func (s *Statsd) Gather(acc inputs.Accumulator) error { +func (s *Statsd) Gather(acc telegraf.Accumulator) error { s.Lock() defer s.Unlock() @@ -515,7 +516,7 @@ func (s *Statsd) Stop() { } func init() { - inputs.Add("statsd", func() inputs.Input { + inputs.Add("statsd", func() telegraf.Input { return &Statsd{ ConvertNames: true, UDPPacketSize: UDP_PACKET_SIZE, diff --git a/plugins/inputs/system/cpu.go b/plugins/inputs/system/cpu.go index 95c854b2c..b8edfca9e 100644 --- a/plugins/inputs/system/cpu.go +++ b/plugins/inputs/system/cpu.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/shirou/gopsutil/cpu" ) @@ -39,7 +40,7 @@ func (_ *CPUStats) SampleConfig() string { return sampleConfig } -func (s *CPUStats) Gather(acc inputs.Accumulator) error { +func (s *CPUStats) Gather(acc telegraf.Accumulator) error { times, err := s.ps.CPUTimes(s.PerCPU, s.TotalCPU) if err != nil { return fmt.Errorf("error getting CPU info: %s", err) @@ -111,7 +112,7 @@ func totalCpuTime(t cpu.CPUTimesStat) float64 { } func init() { - inputs.Add("cpu", func() inputs.Input { + inputs.Add("cpu", func() telegraf.Input { return &CPUStats{ps: &systemPS{}} }) } diff --git a/plugins/inputs/system/disk.go b/plugins/inputs/system/disk.go index aeddd3f9b..1481f6a91 100644 --- a/plugins/inputs/system/disk.go +++ b/plugins/inputs/system/disk.go @@ -3,6 +3,7 @@ package system import ( "fmt" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -29,7 +30,7 @@ func (_ *DiskStats) SampleConfig() string { return diskSampleConfig } -func (s *DiskStats) Gather(acc inputs.Accumulator) error { +func (s *DiskStats) Gather(acc telegraf.Accumulator) error { // Legacy support: if len(s.Mountpoints) != 0 { s.MountPoints = s.Mountpoints @@ -90,7 +91,7 @@ func (_ *DiskIOStats) SampleConfig() string { return diskIoSampleConfig } -func (s *DiskIOStats) Gather(acc inputs.Accumulator) error { +func (s *DiskIOStats) Gather(acc telegraf.Accumulator) error { diskio, err := s.ps.DiskIO() if err != nil { return fmt.Errorf("error getting disk io info: %s", err) @@ -136,11 +137,11 @@ func (s *DiskIOStats) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("disk", func() inputs.Input { + inputs.Add("disk", func() telegraf.Input { return &DiskStats{ps: &systemPS{}} }) - inputs.Add("diskio", func() inputs.Input { + inputs.Add("diskio", func() telegraf.Input { return &DiskIOStats{ps: &systemPS{}} }) } diff --git a/plugins/inputs/system/memory.go b/plugins/inputs/system/memory.go index 32a2f2b09..82ce9c9c8 100644 --- a/plugins/inputs/system/memory.go +++ b/plugins/inputs/system/memory.go @@ -3,6 +3,7 @@ package system import ( "fmt" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -16,7 +17,7 @@ func (_ *MemStats) Description() string { func (_ *MemStats) SampleConfig() string { return "" } -func (s *MemStats) Gather(acc inputs.Accumulator) error { +func (s *MemStats) Gather(acc telegraf.Accumulator) error { vm, err := s.ps.VMStat() if err != nil { return fmt.Errorf("error getting virtual memory info: %s", err) @@ -47,7 +48,7 @@ func (_ *SwapStats) Description() string { func (_ *SwapStats) SampleConfig() string { return "" } -func (s *SwapStats) Gather(acc inputs.Accumulator) error { +func (s *SwapStats) Gather(acc telegraf.Accumulator) error { swap, err := s.ps.SwapStat() if err != nil { return fmt.Errorf("error getting swap memory info: %s", err) @@ -67,11 +68,11 @@ func (s *SwapStats) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("mem", func() inputs.Input { + inputs.Add("mem", func() telegraf.Input { return &MemStats{ps: &systemPS{}} }) - inputs.Add("swap", func() inputs.Input { + inputs.Add("swap", func() telegraf.Input { return &SwapStats{ps: &systemPS{}} }) } diff --git a/plugins/inputs/system/net.go b/plugins/inputs/system/net.go index 7f71f5200..451892d37 100644 --- a/plugins/inputs/system/net.go +++ b/plugins/inputs/system/net.go @@ -5,6 +5,7 @@ import ( "net" "strings" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -31,7 +32,7 @@ func (_ *NetIOStats) SampleConfig() string { return netSampleConfig } -func (s *NetIOStats) Gather(acc inputs.Accumulator) error { +func (s *NetIOStats) Gather(acc telegraf.Accumulator) error { netio, err := s.ps.NetIO() if err != nil { return fmt.Errorf("error getting net io info: %s", err) @@ -103,7 +104,7 @@ func (s *NetIOStats) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("net", func() inputs.Input { + inputs.Add("net", func() telegraf.Input { return &NetIOStats{ps: &systemPS{}} }) } diff --git a/plugins/inputs/system/netstat.go b/plugins/inputs/system/netstat.go index 0fe704ee0..4eab80e87 100644 --- a/plugins/inputs/system/netstat.go +++ b/plugins/inputs/system/netstat.go @@ -4,6 +4,7 @@ import ( "fmt" "syscall" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -21,7 +22,7 @@ func (_ *NetStats) SampleConfig() string { return tcpstatSampleConfig } -func (s *NetStats) Gather(acc inputs.Accumulator) error { +func (s *NetStats) Gather(acc telegraf.Accumulator) error { netconns, err := s.ps.NetConnections() if err != nil { return fmt.Errorf("error getting net connections info: %s", err) @@ -64,7 +65,7 @@ func (s *NetStats) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("netstat", func() inputs.Input { + inputs.Add("netstat", func() telegraf.Input { return &NetStats{ps: &systemPS{}} }) } diff --git a/plugins/inputs/system/ps.go b/plugins/inputs/system/ps.go index 98c9b8b31..0a505bfc4 100644 --- a/plugins/inputs/system/ps.go +++ b/plugins/inputs/system/ps.go @@ -3,8 +3,8 @@ package system import ( "os" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/inputs" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" @@ -23,7 +23,7 @@ type PS interface { NetConnections() ([]net.NetConnectionStat, error) } -func add(acc inputs.Accumulator, +func add(acc telegraf.Accumulator, name string, val float64, tags map[string]string) { if val >= 0 { acc.Add(name, val, tags) diff --git a/plugins/inputs/system/system.go b/plugins/inputs/system/system.go index 4a0a76d48..9922d5a92 100644 --- a/plugins/inputs/system/system.go +++ b/plugins/inputs/system/system.go @@ -8,6 +8,7 @@ import ( "github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/load" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -19,7 +20,7 @@ func (_ *SystemStats) Description() string { func (_ *SystemStats) SampleConfig() string { return "" } -func (_ *SystemStats) Gather(acc inputs.Accumulator) error { +func (_ *SystemStats) Gather(acc telegraf.Accumulator) error { loadavg, err := load.LoadAvg() if err != nil { return err @@ -68,7 +69,7 @@ func format_uptime(uptime uint64) string { } func init() { - inputs.Add("system", func() inputs.Input { + inputs.Add("system", func() telegraf.Input { return &SystemStats{} }) } diff --git a/plugins/inputs/trig/trig.go b/plugins/inputs/trig/trig.go index 604f9734a..e879f39ee 100644 --- a/plugins/inputs/trig/trig.go +++ b/plugins/inputs/trig/trig.go @@ -3,6 +3,7 @@ package trig import ( "math" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -24,7 +25,7 @@ func (s *Trig) Description() string { return "Inserts sine and cosine waves for demonstration purposes" } -func (s *Trig) Gather(acc inputs.Accumulator) error { +func (s *Trig) Gather(acc telegraf.Accumulator) error { sinner := math.Sin((s.x*math.Pi)/5.0) * s.Amplitude cosinner := math.Cos((s.x*math.Pi)/5.0) * s.Amplitude @@ -41,5 +42,5 @@ func (s *Trig) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("Trig", func() inputs.Input { return &Trig{x: 0.0} }) + inputs.Add("Trig", func() telegraf.Input { return &Trig{x: 0.0} }) } diff --git a/plugins/inputs/twemproxy/twemproxy.go b/plugins/inputs/twemproxy/twemproxy.go index 6dcce8058..8d8349edb 100644 --- a/plugins/inputs/twemproxy/twemproxy.go +++ b/plugins/inputs/twemproxy/twemproxy.go @@ -7,6 +7,7 @@ import ( "net" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -31,7 +32,7 @@ func (t *Twemproxy) Description() string { } // Gather data from all Twemproxy instances -func (t *Twemproxy) Gather(acc inputs.Accumulator) error { +func (t *Twemproxy) Gather(acc telegraf.Accumulator) error { conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second) if err != nil { return err @@ -55,7 +56,7 @@ func (t *Twemproxy) Gather(acc inputs.Accumulator) error { // Process Twemproxy server stats func (t *Twemproxy) processStat( - acc inputs.Accumulator, + acc telegraf.Accumulator, tags map[string]string, data map[string]interface{}, ) { @@ -89,7 +90,7 @@ func (t *Twemproxy) processStat( // Process pool data in Twemproxy stats func (t *Twemproxy) processPool( - acc inputs.Accumulator, + acc telegraf.Accumulator, tags map[string]string, data map[string]interface{}, ) { @@ -117,7 +118,7 @@ func (t *Twemproxy) processPool( // Process backend server(redis/memcached) stats func (t *Twemproxy) processServer( - acc inputs.Accumulator, + acc telegraf.Accumulator, tags map[string]string, data map[string]interface{}, ) { @@ -143,7 +144,7 @@ func copyTags(tags map[string]string) map[string]string { } func init() { - inputs.Add("twemproxy", func() inputs.Input { + inputs.Add("twemproxy", func() telegraf.Input { return &Twemproxy{} }) } diff --git a/plugins/inputs/zfs/zfs.go b/plugins/inputs/zfs/zfs.go index 13f2d9806..b6075a56f 100644 --- a/plugins/inputs/zfs/zfs.go +++ b/plugins/inputs/zfs/zfs.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -68,7 +69,7 @@ func getTags(pools []poolInfo) map[string]string { return map[string]string{"pools": poolNames} } -func gatherPoolStats(pool poolInfo, acc inputs.Accumulator) error { +func gatherPoolStats(pool poolInfo, acc telegraf.Accumulator) error { lines, err := internal.ReadLines(pool.ioFilename) if err != nil { return err @@ -101,7 +102,7 @@ func gatherPoolStats(pool poolInfo, acc inputs.Accumulator) error { return nil } -func (z *Zfs) Gather(acc inputs.Accumulator) error { +func (z *Zfs) Gather(acc telegraf.Accumulator) error { kstatMetrics := z.KstatMetrics if len(kstatMetrics) == 0 { kstatMetrics = []string{"arcstats", "zfetchstats", "vdev_cache_stats"} @@ -149,7 +150,7 @@ func (z *Zfs) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("zfs", func() inputs.Input { + inputs.Add("zfs", func() telegraf.Input { return &Zfs{} }) } diff --git a/plugins/inputs/zookeeper/zookeeper.go b/plugins/inputs/zookeeper/zookeeper.go index c2940f5e3..bd964d4cc 100644 --- a/plugins/inputs/zookeeper/zookeeper.go +++ b/plugins/inputs/zookeeper/zookeeper.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -40,7 +41,7 @@ func (z *Zookeeper) Description() string { } // Gather reads stats from all configured servers accumulates stats -func (z *Zookeeper) Gather(acc inputs.Accumulator) error { +func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { if len(z.Servers) == 0 { return nil } @@ -53,7 +54,7 @@ func (z *Zookeeper) Gather(acc inputs.Accumulator) error { return nil } -func (z *Zookeeper) gatherServer(address string, acc inputs.Accumulator) error { +func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error { _, _, err := net.SplitHostPort(address) if err != nil { address = address + ":2181" @@ -103,7 +104,7 @@ func (z *Zookeeper) gatherServer(address string, acc inputs.Accumulator) error { } func init() { - inputs.Add("zookeeper", func() inputs.Input { + inputs.Add("zookeeper", func() telegraf.Input { return &Zookeeper{} }) } diff --git a/plugins/outputs/amon/amon.go b/plugins/outputs/amon/amon.go index e9f2c9f30..af3d37146 100644 --- a/plugins/outputs/amon/amon.go +++ b/plugins/outputs/amon/amon.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) type Amon struct { @@ -151,7 +152,7 @@ func (a *Amon) Close() error { } func init() { - outputs.Add("amon", func() outputs.Output { + outputs.Add("amon", func() telegraf.Output { return &Amon{} }) } diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index bdbf47b86..688ab6a47 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" "github.com/streadway/amqp" ) @@ -190,7 +191,7 @@ func (q *AMQP) Write(points []*client.Point) error { } func init() { - outputs.Add("amqp", func() outputs.Output { + outputs.Add("amqp", func() telegraf.Output { return &AMQP{ Database: DefaultDatabase, Precision: DefaultPrecision, diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index 1e20836da..f581a5219 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) type CloudWatch struct { @@ -230,7 +231,7 @@ func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { } func init() { - outputs.Add("cloudwatch", func() outputs.Output { + outputs.Add("cloudwatch", func() telegraf.Output { return &CloudWatch{} }) } diff --git a/plugins/outputs/datadog/datadog.go b/plugins/outputs/datadog/datadog.go index 7d6539789..78555fc1f 100644 --- a/plugins/outputs/datadog/datadog.go +++ b/plugins/outputs/datadog/datadog.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) type Datadog struct { @@ -173,7 +174,7 @@ func (d *Datadog) Close() error { } func init() { - outputs.Add("datadog", func() outputs.Output { + outputs.Add("datadog", func() telegraf.Output { return NewDatadog(datadog_api) }) } diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index f9781041f..f7e58672a 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" "log" "math/rand" "net" @@ -128,7 +129,7 @@ func (g *Graphite) Write(points []*client.Point) error { } func init() { - outputs.Add("graphite", func() outputs.Output { + outputs.Add("graphite", func() telegraf.Output { return &Graphite{} }) } diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index f45f020b6..41bfba9ba 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) type InfluxDB struct { @@ -156,7 +157,7 @@ func (i *InfluxDB) Write(points []*client.Point) error { } func init() { - outputs.Add("influxdb", func() outputs.Output { + outputs.Add("influxdb", func() telegraf.Output { return &InfluxDB{} }) } diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index b16347c92..37773b7d1 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -8,6 +8,7 @@ import ( "github.com/Shopify/sarama" "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" "io/ioutil" ) @@ -140,7 +141,7 @@ func (k *Kafka) Write(points []*client.Point) error { } func init() { - outputs.Add("kafka", func() outputs.Output { + outputs.Add("kafka", func() telegraf.Output { return &Kafka{} }) } diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 23ca03c5e..4824a8cea 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) type KinesisOutput struct { @@ -172,7 +173,7 @@ func (k *KinesisOutput) Write(points []*client.Point) error { } func init() { - outputs.Add("kinesis", func() outputs.Output { + outputs.Add("kinesis", func() telegraf.Output { return &KinesisOutput{} }) } diff --git a/plugins/outputs/librato/librato.go b/plugins/outputs/librato/librato.go index 6afcb4542..9903b17b3 100644 --- a/plugins/outputs/librato/librato.go +++ b/plugins/outputs/librato/librato.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) type Librato struct { @@ -169,7 +170,7 @@ func (l *Librato) Close() error { } func init() { - outputs.Add("librato", func() outputs.Output { + outputs.Add("librato", func() telegraf.Output { return NewLibrato(librato_api) }) } diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 7c47cf741..75a992a01 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) const MaxClientIdLen = 8 @@ -184,7 +185,7 @@ func getCertPool(pemPath string) (*x509.CertPool, error) { } func init() { - outputs.Add("mqtt", func() outputs.Output { + outputs.Add("mqtt", func() telegraf.Output { return &MQTT{} }) } diff --git a/plugins/outputs/nsq/nsq.go b/plugins/outputs/nsq/nsq.go index 79818ec5c..94c636b44 100644 --- a/plugins/outputs/nsq/nsq.go +++ b/plugins/outputs/nsq/nsq.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" "github.com/nsqio/go-nsq" ) @@ -65,7 +66,7 @@ func (n *NSQ) Write(points []*client.Point) error { } func init() { - outputs.Add("nsq", func() outputs.Output { + outputs.Add("nsq", func() telegraf.Output { return &NSQ{} }) } diff --git a/plugins/outputs/opentsdb/opentsdb.go b/plugins/outputs/opentsdb/opentsdb.go index 6e9f3e26a..480738c26 100644 --- a/plugins/outputs/opentsdb/opentsdb.go +++ b/plugins/outputs/opentsdb/opentsdb.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) type OpenTSDB struct { @@ -162,7 +163,7 @@ func (o *OpenTSDB) Close() error { } func init() { - outputs.Add("opentsdb", func() outputs.Output { + outputs.Add("opentsdb", func() telegraf.Output { return &OpenTSDB{} }) } diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 4e429722a..34f85bb23 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" "github.com/prometheus/client_golang/prometheus" ) @@ -119,7 +120,7 @@ func (p *PrometheusClient) Write(points []*client.Point) error { } func init() { - outputs.Add("prometheus_client", func() outputs.Output { + outputs.Add("prometheus_client", func() telegraf.Output { return &PrometheusClient{} }) } diff --git a/plugins/outputs/registry.go b/plugins/outputs/registry.go index d4c6ba1e5..5787a82b0 100644 --- a/plugins/outputs/registry.go +++ b/plugins/outputs/registry.go @@ -1,40 +1,10 @@ package outputs import ( - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" ) -type Output interface { - // Connect to the Output - Connect() error - // Close any connections to the Output - Close() error - // Description returns a one-sentence description on the Output - Description() string - // SampleConfig returns the default configuration of the Output - SampleConfig() string - // Write takes in group of points to be written to the Output - Write(points []*client.Point) error -} - -type ServiceOutput interface { - // Connect to the Output - Connect() error - // Close any connections to the Output - Close() error - // Description returns a one-sentence description on the Output - Description() string - // SampleConfig returns the default configuration of the Output - SampleConfig() string - // Write takes in group of points to be written to the Output - Write(points []*client.Point) error - // Start the "service" that will provide an Output - Start() error - // Stop the "service" that will provide an Output - Stop() -} - -type Creator func() Output +type Creator func() telegraf.Output var Outputs = map[string]Creator{} diff --git a/plugins/outputs/riemann/riemann.go b/plugins/outputs/riemann/riemann.go index c1b22ec46..50c1555f7 100644 --- a/plugins/outputs/riemann/riemann.go +++ b/plugins/outputs/riemann/riemann.go @@ -8,6 +8,7 @@ import ( "github.com/amir/raidman" "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf" ) type Riemann struct { @@ -95,7 +96,7 @@ func buildEvents(p *client.Point) []*raidman.Event { } func init() { - outputs.Add("riemann", func() outputs.Output { + outputs.Add("riemann", func() telegraf.Output { return &Riemann{} }) } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 7cdfb4155..05363e28c 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -84,23 +84,6 @@ func (a *Accumulator) AddFields( a.Points = append(a.Points, p) } -func (a *Accumulator) SetDefaultTags(tags map[string]string) { - // stub for implementing Accumulator interface. -} - -func (a *Accumulator) AddDefaultTag(key, value string) { - // stub for implementing Accumulator interface. -} - -func (a *Accumulator) Prefix() string { - // stub for implementing Accumulator interface. - return "" -} - -func (a *Accumulator) SetPrefix(prefix string) { - // stub for implementing Accumulator interface. -} - func (a *Accumulator) Debug() bool { // stub for implementing Accumulator interface. return a.debug