Create public models for telegraf metrics, accumlator, plugins
This will basically make the root directory a place for storing the major telegraf interfaces, which will make telegraf's godoc looks quite a bit nicer. And make it easier for contributors to lookup the few data types that they actually care about. closes #564
This commit is contained in:
parent
58e03547c9
commit
5364a20825
accumulator.go
agent
cmd/telegraf
input.gointernal
metric.gooutput.goplugins
inputs
aerospike
apache
bcache
disque
docker
elasticsearch
exec
github_webhooks
haproxy
httpjson
influxdb
jolokia
kafka_consumer
leofs
lustre2
mailchimp
memcached
mock_Plugin.gomongodb
mysql
nginx
nsq
passenger
phpfpm
ping
postgresql
procstat
prometheus
puppetagent
rabbitmq
redis
registry.gorethinkdb
sensors
snmp
sqlserver
statsd
system
trig
twemproxy
zfs
zookeeper
outputs
amon
amqp
cloudwatch
datadog
graphite
influxdb
kafka
kinesis
librato
mqtt
nsq
opentsdb
prometheus_client
registry.goriemann
testutil
191
accumulator.go
191
accumulator.go
|
@ -1,188 +1,21 @@
|
|||
package telegraf
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/internal/models"
|
||||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
)
|
||||
import "time"
|
||||
|
||||
type Accumulator interface {
|
||||
Add(measurement string, value interface{},
|
||||
tags map[string]string, t ...time.Time)
|
||||
AddFields(measurement string, fields map[string]interface{},
|
||||
tags map[string]string, t ...time.Time)
|
||||
// Create a point with a value, decorating it with tags
|
||||
// NOTE: tags is expected to be owned by the caller, don't mutate
|
||||
// it after passing to Add.
|
||||
Add(measurement string,
|
||||
value interface{},
|
||||
tags map[string]string,
|
||||
t ...time.Time)
|
||||
|
||||
SetDefaultTags(tags map[string]string)
|
||||
AddDefaultTag(key, value string)
|
||||
|
||||
Prefix() string
|
||||
SetPrefix(prefix string)
|
||||
AddFields(measurement string,
|
||||
fields map[string]interface{},
|
||||
tags map[string]string,
|
||||
t ...time.Time)
|
||||
|
||||
Debug() bool
|
||||
SetDebug(enabled bool)
|
||||
}
|
||||
|
||||
func NewAccumulator(
|
||||
inputConfig *models.InputConfig,
|
||||
points chan *client.Point,
|
||||
) Accumulator {
|
||||
acc := accumulator{}
|
||||
acc.points = points
|
||||
acc.inputConfig = inputConfig
|
||||
return &acc
|
||||
}
|
||||
|
||||
type accumulator struct {
|
||||
sync.Mutex
|
||||
|
||||
points chan *client.Point
|
||||
|
||||
defaultTags map[string]string
|
||||
|
||||
debug bool
|
||||
|
||||
inputConfig *models.InputConfig
|
||||
|
||||
prefix string
|
||||
}
|
||||
|
||||
func (ac *accumulator) Add(
|
||||
measurement string,
|
||||
value interface{},
|
||||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
fields := make(map[string]interface{})
|
||||
fields["value"] = value
|
||||
ac.AddFields(measurement, fields, tags, t...)
|
||||
}
|
||||
|
||||
func (ac *accumulator) AddFields(
|
||||
measurement string,
|
||||
fields map[string]interface{},
|
||||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
if len(fields) == 0 || len(measurement) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if !ac.inputConfig.Filter.ShouldTagsPass(tags) {
|
||||
return
|
||||
}
|
||||
|
||||
// Override measurement name if set
|
||||
if len(ac.inputConfig.NameOverride) != 0 {
|
||||
measurement = ac.inputConfig.NameOverride
|
||||
}
|
||||
// Apply measurement prefix and suffix if set
|
||||
if len(ac.inputConfig.MeasurementPrefix) != 0 {
|
||||
measurement = ac.inputConfig.MeasurementPrefix + measurement
|
||||
}
|
||||
if len(ac.inputConfig.MeasurementSuffix) != 0 {
|
||||
measurement = measurement + ac.inputConfig.MeasurementSuffix
|
||||
}
|
||||
|
||||
if tags == nil {
|
||||
tags = make(map[string]string)
|
||||
}
|
||||
// Apply plugin-wide tags if set
|
||||
for k, v := range ac.inputConfig.Tags {
|
||||
if _, ok := tags[k]; !ok {
|
||||
tags[k] = v
|
||||
}
|
||||
}
|
||||
// Apply daemon-wide tags if set
|
||||
for k, v := range ac.defaultTags {
|
||||
if _, ok := tags[k]; !ok {
|
||||
tags[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
result := make(map[string]interface{})
|
||||
for k, v := range fields {
|
||||
// Filter out any filtered fields
|
||||
if ac.inputConfig != nil {
|
||||
if !ac.inputConfig.Filter.ShouldPass(k) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
result[k] = v
|
||||
|
||||
// Validate uint64 and float64 fields
|
||||
switch val := v.(type) {
|
||||
case uint64:
|
||||
// InfluxDB does not support writing uint64
|
||||
if val < uint64(9223372036854775808) {
|
||||
result[k] = int64(val)
|
||||
} else {
|
||||
result[k] = int64(9223372036854775807)
|
||||
}
|
||||
case float64:
|
||||
// NaNs are invalid values in influxdb, skip measurement
|
||||
if math.IsNaN(val) || math.IsInf(val, 0) {
|
||||
if ac.debug {
|
||||
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
|
||||
"field, skipping",
|
||||
measurement, k)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
fields = nil
|
||||
if len(result) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var timestamp time.Time
|
||||
if len(t) > 0 {
|
||||
timestamp = t[0]
|
||||
} else {
|
||||
timestamp = time.Now()
|
||||
}
|
||||
|
||||
if ac.prefix != "" {
|
||||
measurement = ac.prefix + measurement
|
||||
}
|
||||
|
||||
pt, err := client.NewPoint(measurement, tags, result, timestamp)
|
||||
if err != nil {
|
||||
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
|
||||
return
|
||||
}
|
||||
if ac.debug {
|
||||
fmt.Println("> " + pt.String())
|
||||
}
|
||||
ac.points <- pt
|
||||
}
|
||||
|
||||
func (ac *accumulator) SetDefaultTags(tags map[string]string) {
|
||||
ac.defaultTags = tags
|
||||
}
|
||||
|
||||
func (ac *accumulator) AddDefaultTag(key, value string) {
|
||||
ac.defaultTags[key] = value
|
||||
}
|
||||
|
||||
func (ac *accumulator) Prefix() string {
|
||||
return ac.prefix
|
||||
}
|
||||
|
||||
func (ac *accumulator) SetPrefix(prefix string) {
|
||||
ac.prefix = prefix
|
||||
}
|
||||
|
||||
func (ac *accumulator) Debug() bool {
|
||||
return ac.debug
|
||||
}
|
||||
|
||||
func (ac *accumulator) SetDebug(debug bool) {
|
||||
ac.debug = debug
|
||||
}
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/internal/models"
|
||||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
)
|
||||
|
||||
func NewAccumulator(
|
||||
inputConfig *internal_models.InputConfig,
|
||||
points chan *client.Point,
|
||||
) *accumulator {
|
||||
acc := accumulator{}
|
||||
acc.points = points
|
||||
acc.inputConfig = inputConfig
|
||||
return &acc
|
||||
}
|
||||
|
||||
type accumulator struct {
|
||||
sync.Mutex
|
||||
|
||||
points chan *client.Point
|
||||
|
||||
defaultTags map[string]string
|
||||
|
||||
debug bool
|
||||
|
||||
inputConfig *internal_models.InputConfig
|
||||
|
||||
prefix string
|
||||
}
|
||||
|
||||
func (ac *accumulator) Add(
|
||||
measurement string,
|
||||
value interface{},
|
||||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
fields := make(map[string]interface{})
|
||||
fields["value"] = value
|
||||
ac.AddFields(measurement, fields, tags, t...)
|
||||
}
|
||||
|
||||
func (ac *accumulator) AddFields(
|
||||
measurement string,
|
||||
fields map[string]interface{},
|
||||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
if len(fields) == 0 || len(measurement) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if !ac.inputConfig.Filter.ShouldTagsPass(tags) {
|
||||
return
|
||||
}
|
||||
|
||||
// Override measurement name if set
|
||||
if len(ac.inputConfig.NameOverride) != 0 {
|
||||
measurement = ac.inputConfig.NameOverride
|
||||
}
|
||||
// Apply measurement prefix and suffix if set
|
||||
if len(ac.inputConfig.MeasurementPrefix) != 0 {
|
||||
measurement = ac.inputConfig.MeasurementPrefix + measurement
|
||||
}
|
||||
if len(ac.inputConfig.MeasurementSuffix) != 0 {
|
||||
measurement = measurement + ac.inputConfig.MeasurementSuffix
|
||||
}
|
||||
|
||||
if tags == nil {
|
||||
tags = make(map[string]string)
|
||||
}
|
||||
// Apply plugin-wide tags if set
|
||||
for k, v := range ac.inputConfig.Tags {
|
||||
if _, ok := tags[k]; !ok {
|
||||
tags[k] = v
|
||||
}
|
||||
}
|
||||
// Apply daemon-wide tags if set
|
||||
for k, v := range ac.defaultTags {
|
||||
if _, ok := tags[k]; !ok {
|
||||
tags[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
result := make(map[string]interface{})
|
||||
for k, v := range fields {
|
||||
// Filter out any filtered fields
|
||||
if ac.inputConfig != nil {
|
||||
if !ac.inputConfig.Filter.ShouldPass(k) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
result[k] = v
|
||||
|
||||
// Validate uint64 and float64 fields
|
||||
switch val := v.(type) {
|
||||
case uint64:
|
||||
// InfluxDB does not support writing uint64
|
||||
if val < uint64(9223372036854775808) {
|
||||
result[k] = int64(val)
|
||||
} else {
|
||||
result[k] = int64(9223372036854775807)
|
||||
}
|
||||
case float64:
|
||||
// NaNs are invalid values in influxdb, skip measurement
|
||||
if math.IsNaN(val) || math.IsInf(val, 0) {
|
||||
if ac.debug {
|
||||
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
|
||||
"field, skipping",
|
||||
measurement, k)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
fields = nil
|
||||
if len(result) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var timestamp time.Time
|
||||
if len(t) > 0 {
|
||||
timestamp = t[0]
|
||||
} else {
|
||||
timestamp = time.Now()
|
||||
}
|
||||
|
||||
if ac.prefix != "" {
|
||||
measurement = ac.prefix + measurement
|
||||
}
|
||||
|
||||
pt, err := client.NewPoint(measurement, tags, result, timestamp)
|
||||
if err != nil {
|
||||
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
|
||||
return
|
||||
}
|
||||
if ac.debug {
|
||||
fmt.Println("> " + pt.String())
|
||||
}
|
||||
ac.points <- pt
|
||||
}
|
||||
|
||||
func (ac *accumulator) Debug() bool {
|
||||
return ac.debug
|
||||
}
|
||||
|
||||
func (ac *accumulator) SetDebug(debug bool) {
|
||||
ac.debug = debug
|
||||
}
|
||||
|
||||
func (ac *accumulator) setDefaultTags(tags map[string]string) {
|
||||
ac.defaultTags = tags
|
||||
}
|
||||
|
||||
func (ac *accumulator) addDefaultTag(key, value string) {
|
||||
ac.defaultTags[key] = value
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package telegraf
|
||||
package agent
|
||||
|
||||
import (
|
||||
cryptorand "crypto/rand"
|
||||
|
@ -11,10 +11,9 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal/config"
|
||||
"github.com/influxdata/telegraf/internal/models"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
)
|
||||
|
@ -48,7 +47,7 @@ func NewAgent(config *config.Config) (*Agent, error) {
|
|||
func (a *Agent) Connect() error {
|
||||
for _, o := range a.Config.Outputs {
|
||||
switch ot := o.Output.(type) {
|
||||
case outputs.ServiceOutput:
|
||||
case telegraf.ServiceOutput:
|
||||
if err := ot.Start(); err != nil {
|
||||
log.Printf("Service for output %s failed to start, exiting\n%s\n",
|
||||
o.Name, err.Error())
|
||||
|
@ -81,14 +80,14 @@ func (a *Agent) Close() error {
|
|||
for _, o := range a.Config.Outputs {
|
||||
err = o.Output.Close()
|
||||
switch ot := o.Output.(type) {
|
||||
case outputs.ServiceOutput:
|
||||
case telegraf.ServiceOutput:
|
||||
ot.Stop()
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func panicRecover(input *models.RunningInput) {
|
||||
func panicRecover(input *internal_models.RunningInput) {
|
||||
if err := recover(); err != nil {
|
||||
trace := make([]byte, 2048)
|
||||
runtime.Stack(trace, true)
|
||||
|
@ -115,13 +114,13 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
|
|||
|
||||
wg.Add(1)
|
||||
counter++
|
||||
go func(input *models.RunningInput) {
|
||||
go func(input *internal_models.RunningInput) {
|
||||
defer panicRecover(input)
|
||||
defer wg.Done()
|
||||
|
||||
acc := NewAccumulator(input.Config, pointChan)
|
||||
acc.SetDebug(a.Config.Agent.Debug)
|
||||
acc.SetDefaultTags(a.Config.Tags)
|
||||
acc.setDefaultTags(a.Config.Tags)
|
||||
|
||||
if jitter != 0 {
|
||||
nanoSleep := rand.Int63n(jitter)
|
||||
|
@ -159,7 +158,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
|
|||
// reporting interval.
|
||||
func (a *Agent) gatherSeparate(
|
||||
shutdown chan struct{},
|
||||
input *models.RunningInput,
|
||||
input *internal_models.RunningInput,
|
||||
pointChan chan *client.Point,
|
||||
) error {
|
||||
defer panicRecover(input)
|
||||
|
@ -172,7 +171,7 @@ func (a *Agent) gatherSeparate(
|
|||
|
||||
acc := NewAccumulator(input.Config, pointChan)
|
||||
acc.SetDebug(a.Config.Agent.Debug)
|
||||
acc.SetDefaultTags(a.Config.Tags)
|
||||
acc.setDefaultTags(a.Config.Tags)
|
||||
|
||||
if err := input.Input.Gather(acc); err != nil {
|
||||
log.Printf("Error in input [%s]: %s", input.Name, err)
|
||||
|
@ -250,7 +249,7 @@ func (a *Agent) flush() {
|
|||
|
||||
wg.Add(len(a.Config.Outputs))
|
||||
for _, o := range a.Config.Outputs {
|
||||
go func(output *models.RunningOutput) {
|
||||
go func(output *internal_models.RunningOutput) {
|
||||
defer wg.Done()
|
||||
err := output.Write()
|
||||
if err != nil {
|
||||
|
@ -344,7 +343,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
|||
|
||||
// Start service of any ServicePlugins
|
||||
switch p := input.Input.(type) {
|
||||
case inputs.ServiceInput:
|
||||
case telegraf.ServiceInput:
|
||||
if err := p.Start(); err != nil {
|
||||
log.Printf("Service for input %s failed to start, exiting\n%s\n",
|
||||
input.Name, err.Error())
|
||||
|
@ -357,7 +356,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
|||
// configured. Default intervals are handled below with gatherParallel
|
||||
if input.Config.Interval != 0 {
|
||||
wg.Add(1)
|
||||
go func(input *models.RunningInput) {
|
||||
go func(input *internal_models.RunningInput) {
|
||||
defer wg.Done()
|
||||
if err := a.gatherSeparate(shutdown, input, pointChan); err != nil {
|
||||
log.Printf(err.Error())
|
|
@ -1,4 +1,4 @@
|
|||
package telegraf
|
||||
package agent
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -16,35 +16,35 @@ import (
|
|||
func TestAgent_LoadPlugin(t *testing.T) {
|
||||
c := config.NewConfig()
|
||||
c.InputFilters = []string{"mysql"}
|
||||
err := c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ := NewAgent(c)
|
||||
assert.Equal(t, 1, len(a.Config.Inputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.InputFilters = []string{"foo"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 0, len(a.Config.Inputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.InputFilters = []string{"mysql", "foo"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 1, len(a.Config.Inputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.InputFilters = []string{"mysql", "redis"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 2, len(a.Config.Inputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.InputFilters = []string{"mysql", "foo", "redis", "bar"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 2, len(a.Config.Inputs))
|
||||
|
@ -53,42 +53,42 @@ func TestAgent_LoadPlugin(t *testing.T) {
|
|||
func TestAgent_LoadOutput(t *testing.T) {
|
||||
c := config.NewConfig()
|
||||
c.OutputFilters = []string{"influxdb"}
|
||||
err := c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err := c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ := NewAgent(c)
|
||||
assert.Equal(t, 2, len(a.Config.Outputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.OutputFilters = []string{"kafka"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 1, len(a.Config.Outputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.OutputFilters = []string{}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 3, len(a.Config.Outputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.OutputFilters = []string{"foo"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 0, len(a.Config.Outputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.OutputFilters = []string{"influxdb", "foo"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 2, len(a.Config.Outputs))
|
||||
|
||||
c = config.NewConfig()
|
||||
c.OutputFilters = []string{"influxdb", "kafka"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 3, len(c.Outputs))
|
||||
a, _ = NewAgent(c)
|
||||
|
@ -96,7 +96,7 @@ func TestAgent_LoadOutput(t *testing.T) {
|
|||
|
||||
c = config.NewConfig()
|
||||
c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"}
|
||||
err = c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
|
||||
err = c.LoadConfig("../internal/config/testdata/telegraf-agent.toml")
|
||||
assert.NoError(t, err)
|
||||
a, _ = NewAgent(c)
|
||||
assert.Equal(t, 3, len(a.Config.Outputs))
|
|
@ -9,7 +9,7 @@ import (
|
|||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/agent"
|
||||
"github.com/influxdata/telegraf/internal/config"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/all"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/all"
|
||||
|
@ -173,7 +173,7 @@ func main() {
|
|||
log.Fatalf("Error: no inputs found, did you provide a valid config file?")
|
||||
}
|
||||
|
||||
ag, err := telegraf.NewAgent(c)
|
||||
ag, err := agent.NewAgent(c)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
package telegraf
|
||||
|
||||
type Input interface {
|
||||
// SampleConfig returns the default configuration of the Input
|
||||
SampleConfig() string
|
||||
|
||||
// Description returns a one-sentence description on the Input
|
||||
Description() string
|
||||
|
||||
// Gather takes in an accumulator and adds the metrics that the Input
|
||||
// gathers. This is called every "interval"
|
||||
Gather(Accumulator) error
|
||||
}
|
||||
|
||||
type ServiceInput interface {
|
||||
// SampleConfig returns the default configuration of the Input
|
||||
SampleConfig() string
|
||||
|
||||
// Description returns a one-sentence description on the Input
|
||||
Description() string
|
||||
|
||||
// Gather takes in an accumulator and adds the metrics that the Input
|
||||
// gathers. This is called every "interval"
|
||||
Gather(Accumulator) error
|
||||
|
||||
// Start starts the ServiceInput's service, whatever that may be
|
||||
Start() error
|
||||
|
||||
// Stop stops the services and closes any necessary channels and connections
|
||||
Stop()
|
||||
}
|
|
@ -10,6 +10,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/models"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
@ -28,8 +29,8 @@ type Config struct {
|
|||
OutputFilters []string
|
||||
|
||||
Agent *AgentConfig
|
||||
Inputs []*models.RunningInput
|
||||
Outputs []*models.RunningOutput
|
||||
Inputs []*internal_models.RunningInput
|
||||
Outputs []*internal_models.RunningOutput
|
||||
}
|
||||
|
||||
func NewConfig() *Config {
|
||||
|
@ -43,8 +44,8 @@ func NewConfig() *Config {
|
|||
},
|
||||
|
||||
Tags: make(map[string]string),
|
||||
Inputs: make([]*models.RunningInput, 0),
|
||||
Outputs: make([]*models.RunningOutput, 0),
|
||||
Inputs: make([]*internal_models.RunningInput, 0),
|
||||
Outputs: make([]*internal_models.RunningOutput, 0),
|
||||
InputFilters: make([]string, 0),
|
||||
OutputFilters: make([]string, 0),
|
||||
}
|
||||
|
@ -227,13 +228,13 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
|
|||
|
||||
// Print Inputs
|
||||
fmt.Printf(pluginHeader)
|
||||
servInputs := make(map[string]inputs.ServiceInput)
|
||||
servInputs := make(map[string]telegraf.ServiceInput)
|
||||
for _, pname := range pnames {
|
||||
creator := inputs.Inputs[pname]
|
||||
input := creator()
|
||||
|
||||
switch p := input.(type) {
|
||||
case inputs.ServiceInput:
|
||||
case telegraf.ServiceInput:
|
||||
servInputs[pname] = p
|
||||
continue
|
||||
}
|
||||
|
@ -403,7 +404,7 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
|
|||
return err
|
||||
}
|
||||
|
||||
ro := models.NewRunningOutput(name, output, outputConfig)
|
||||
ro := internal_models.NewRunningOutput(name, output, outputConfig)
|
||||
if c.Agent.MetricBufferLimit > 0 {
|
||||
ro.PointBufferLimit = c.Agent.MetricBufferLimit
|
||||
}
|
||||
|
@ -436,7 +437,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
|||
return err
|
||||
}
|
||||
|
||||
rp := &models.RunningInput{
|
||||
rp := &internal_models.RunningInput{
|
||||
Name: name,
|
||||
Input: input,
|
||||
Config: pluginConfig,
|
||||
|
@ -446,10 +447,10 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
|||
}
|
||||
|
||||
// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to
|
||||
// be inserted into the models.OutputConfig/models.InputConfig to be used for prefix
|
||||
// be inserted into the internal_models.OutputConfig/internal_models.InputConfig to be used for prefix
|
||||
// filtering on tags and measurements
|
||||
func buildFilter(tbl *ast.Table) models.Filter {
|
||||
f := models.Filter{}
|
||||
func buildFilter(tbl *ast.Table) internal_models.Filter {
|
||||
f := internal_models.Filter{}
|
||||
|
||||
if node, ok := tbl.Fields["pass"]; ok {
|
||||
if kv, ok := node.(*ast.KeyValue); ok {
|
||||
|
@ -481,7 +482,7 @@ func buildFilter(tbl *ast.Table) models.Filter {
|
|||
if subtbl, ok := node.(*ast.Table); ok {
|
||||
for name, val := range subtbl.Fields {
|
||||
if kv, ok := val.(*ast.KeyValue); ok {
|
||||
tagfilter := &models.TagFilter{Name: name}
|
||||
tagfilter := &internal_models.TagFilter{Name: name}
|
||||
if ary, ok := kv.Value.(*ast.Array); ok {
|
||||
for _, elem := range ary.Value {
|
||||
if str, ok := elem.(*ast.String); ok {
|
||||
|
@ -500,7 +501,7 @@ func buildFilter(tbl *ast.Table) models.Filter {
|
|||
if subtbl, ok := node.(*ast.Table); ok {
|
||||
for name, val := range subtbl.Fields {
|
||||
if kv, ok := val.(*ast.KeyValue); ok {
|
||||
tagfilter := &models.TagFilter{Name: name}
|
||||
tagfilter := &internal_models.TagFilter{Name: name}
|
||||
if ary, ok := kv.Value.(*ast.Array); ok {
|
||||
for _, elem := range ary.Value {
|
||||
if str, ok := elem.(*ast.String); ok {
|
||||
|
@ -524,9 +525,9 @@ func buildFilter(tbl *ast.Table) models.Filter {
|
|||
|
||||
// buildInput parses input specific items from the ast.Table,
|
||||
// builds the filter and returns a
|
||||
// models.InputConfig to be inserted into models.RunningInput
|
||||
func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
|
||||
cp := &models.InputConfig{Name: name}
|
||||
// internal_models.InputConfig to be inserted into internal_models.RunningInput
|
||||
func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, error) {
|
||||
cp := &internal_models.InputConfig{Name: name}
|
||||
if node, ok := tbl.Fields["interval"]; ok {
|
||||
if kv, ok := node.(*ast.KeyValue); ok {
|
||||
if str, ok := kv.Value.(*ast.String); ok {
|
||||
|
@ -583,10 +584,10 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
|
|||
}
|
||||
|
||||
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
|
||||
// models.OutputConfig to be inserted into models.RunningInput
|
||||
// internal_models.OutputConfig to be inserted into internal_models.RunningInput
|
||||
// Note: error exists in the return for future calls that might require error
|
||||
func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
|
||||
oc := &models.OutputConfig{
|
||||
func buildOutput(name string, tbl *ast.Table) (*internal_models.OutputConfig, error) {
|
||||
oc := &internal_models.OutputConfig{
|
||||
Name: name,
|
||||
Filter: buildFilter(tbl),
|
||||
}
|
||||
|
|
|
@ -19,19 +19,19 @@ func TestConfig_LoadSingleInput(t *testing.T) {
|
|||
memcached := inputs.Inputs["memcached"]().(*memcached.Memcached)
|
||||
memcached.Servers = []string{"localhost"}
|
||||
|
||||
mConfig := &models.InputConfig{
|
||||
mConfig := &internal_models.InputConfig{
|
||||
Name: "memcached",
|
||||
Filter: models.Filter{
|
||||
Filter: internal_models.Filter{
|
||||
Drop: []string{"other", "stuff"},
|
||||
Pass: []string{"some", "strings"},
|
||||
TagDrop: []models.TagFilter{
|
||||
models.TagFilter{
|
||||
TagDrop: []internal_models.TagFilter{
|
||||
internal_models.TagFilter{
|
||||
Name: "badtag",
|
||||
Filter: []string{"othertag"},
|
||||
},
|
||||
},
|
||||
TagPass: []models.TagFilter{
|
||||
models.TagFilter{
|
||||
TagPass: []internal_models.TagFilter{
|
||||
internal_models.TagFilter{
|
||||
Name: "goodtag",
|
||||
Filter: []string{"mytag"},
|
||||
},
|
||||
|
@ -62,19 +62,19 @@ func TestConfig_LoadDirectory(t *testing.T) {
|
|||
memcached := inputs.Inputs["memcached"]().(*memcached.Memcached)
|
||||
memcached.Servers = []string{"localhost"}
|
||||
|
||||
mConfig := &models.InputConfig{
|
||||
mConfig := &internal_models.InputConfig{
|
||||
Name: "memcached",
|
||||
Filter: models.Filter{
|
||||
Filter: internal_models.Filter{
|
||||
Drop: []string{"other", "stuff"},
|
||||
Pass: []string{"some", "strings"},
|
||||
TagDrop: []models.TagFilter{
|
||||
models.TagFilter{
|
||||
TagDrop: []internal_models.TagFilter{
|
||||
internal_models.TagFilter{
|
||||
Name: "badtag",
|
||||
Filter: []string{"othertag"},
|
||||
},
|
||||
},
|
||||
TagPass: []models.TagFilter{
|
||||
models.TagFilter{
|
||||
TagPass: []internal_models.TagFilter{
|
||||
internal_models.TagFilter{
|
||||
Name: "goodtag",
|
||||
Filter: []string{"mytag"},
|
||||
},
|
||||
|
@ -92,7 +92,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
|
|||
|
||||
ex := inputs.Inputs["exec"]().(*exec.Exec)
|
||||
ex.Command = "/usr/bin/myothercollector --foo=bar"
|
||||
eConfig := &models.InputConfig{
|
||||
eConfig := &internal_models.InputConfig{
|
||||
Name: "exec",
|
||||
MeasurementSuffix: "_myothercollector",
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
|
|||
pstat := inputs.Inputs["procstat"]().(*procstat.Procstat)
|
||||
pstat.PidFile = "/var/run/grafana-server.pid"
|
||||
|
||||
pConfig := &models.InputConfig{Name: "procstat"}
|
||||
pConfig := &internal_models.InputConfig{Name: "procstat"}
|
||||
pConfig.Tags = make(map[string]string)
|
||||
|
||||
assert.Equal(t, pstat, c.Inputs[3].Input,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package models
|
||||
package internal_models
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package models
|
||||
package internal_models
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
package models
|
||||
package internal_models
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type RunningInput struct {
|
||||
Name string
|
||||
Input inputs.Input
|
||||
Input telegraf.Input
|
||||
Config *InputConfig
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package models
|
||||
package internal_models
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
)
|
||||
|
@ -13,7 +13,7 @@ const DEFAULT_POINT_BUFFER_LIMIT = 10000
|
|||
|
||||
type RunningOutput struct {
|
||||
Name string
|
||||
Output outputs.Output
|
||||
Output telegraf.Output
|
||||
Config *OutputConfig
|
||||
Quiet bool
|
||||
PointBufferLimit int
|
||||
|
@ -24,7 +24,7 @@ type RunningOutput struct {
|
|||
|
||||
func NewRunningOutput(
|
||||
name string,
|
||||
output outputs.Output,
|
||||
output telegraf.Output,
|
||||
conf *OutputConfig,
|
||||
) *RunningOutput {
|
||||
ro := &RunningOutput{
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
package telegraf
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
type Metric interface {
|
||||
// Name returns the measurement name of the metric
|
||||
Name() string
|
||||
|
||||
// Name returns the tags associated with the metric
|
||||
Tags() map[string]string
|
||||
|
||||
// Time return the timestamp for the metric
|
||||
Time() time.Time
|
||||
|
||||
// UnixNano returns the unix nano time of the metric
|
||||
UnixNano() int64
|
||||
|
||||
// Fields returns the fields for the metric
|
||||
Fields() map[string]interface{}
|
||||
|
||||
// String returns a line-protocol string of the metric
|
||||
String() string
|
||||
|
||||
// PrecisionString returns a line-protocol string of the metric, at precision
|
||||
PrecisionString(precison string) string
|
||||
|
||||
// Point returns a influxdb client.Point object
|
||||
Point() *client.Point
|
||||
}
|
||||
|
||||
// metric is a wrapper of the influxdb client.Point struct
|
||||
type metric struct {
|
||||
pt *client.Point
|
||||
}
|
||||
|
||||
// NewMetric returns a metric with the given timestamp. If a timestamp is not
|
||||
// given, then data is sent to the database without a timestamp, in which case
|
||||
// the server will assign local time upon reception. NOTE: it is recommended to
|
||||
// send data with a timestamp.
|
||||
func NewMetric(
|
||||
name string,
|
||||
tags map[string]string,
|
||||
fields map[string]interface{},
|
||||
t ...time.Time,
|
||||
) (Metric, error) {
|
||||
var T time.Time
|
||||
if len(t) > 0 {
|
||||
T = t[0]
|
||||
}
|
||||
|
||||
pt, err := client.NewPoint(name, tags, fields, T)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &metric{
|
||||
pt: pt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ParseMetrics returns a slice of Metrics from a text representation of a
|
||||
// metric (in line-protocol format)
|
||||
// with each metric separated by newlines. If any metrics fail to parse,
|
||||
// a non-nil error will be returned in addition to the metrics that parsed
|
||||
// successfully.
|
||||
func ParseMetrics(buf []byte) ([]Metric, error) {
|
||||
points, err := models.ParsePoints(buf)
|
||||
metrics := make([]Metric, len(points))
|
||||
for i, point := range points {
|
||||
// Ignore error here because it's impossible that a model.Point
|
||||
// wouldn't parse into client.Point properly
|
||||
metrics[i], _ = NewMetric(point.Name(), point.Tags(),
|
||||
point.Fields(), point.Time())
|
||||
}
|
||||
return metrics, err
|
||||
}
|
||||
|
||||
func (m *metric) Name() string {
|
||||
return m.pt.Name()
|
||||
}
|
||||
|
||||
func (m *metric) Tags() map[string]string {
|
||||
return m.pt.Tags()
|
||||
}
|
||||
|
||||
func (m *metric) Time() time.Time {
|
||||
return m.pt.Time()
|
||||
}
|
||||
|
||||
func (m *metric) UnixNano() int64 {
|
||||
return m.pt.UnixNano()
|
||||
}
|
||||
|
||||
func (m *metric) Fields() map[string]interface{} {
|
||||
return m.pt.Fields()
|
||||
}
|
||||
|
||||
func (m *metric) String() string {
|
||||
return m.pt.String()
|
||||
}
|
||||
|
||||
func (m *metric) PrecisionString(precison string) string {
|
||||
return m.pt.PrecisionString(precison)
|
||||
}
|
||||
|
||||
func (m *metric) Point() *client.Point {
|
||||
return m.pt
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
package telegraf
|
||||
|
||||
import "github.com/influxdata/influxdb/client/v2"
|
||||
|
||||
// type Output interface {
|
||||
// // Connect to the Output
|
||||
// Connect() error
|
||||
// // Close any connections to the Output
|
||||
// Close() error
|
||||
// // Description returns a one-sentence description on the Output
|
||||
// Description() string
|
||||
// // SampleConfig returns the default configuration of the Output
|
||||
// SampleConfig() string
|
||||
// // Write takes in group of points to be written to the Output
|
||||
// Write(metrics []Metric) error
|
||||
// }
|
||||
|
||||
// type ServiceOutput interface {
|
||||
// // Connect to the Output
|
||||
// Connect() error
|
||||
// // Close any connections to the Output
|
||||
// Close() error
|
||||
// // Description returns a one-sentence description on the Output
|
||||
// Description() string
|
||||
// // SampleConfig returns the default configuration of the Output
|
||||
// SampleConfig() string
|
||||
// // Write takes in group of points to be written to the Output
|
||||
// Write(metrics []Metric) error
|
||||
// // Start the "service" that will provide an Output
|
||||
// Start() error
|
||||
// // Stop the "service" that will provide an Output
|
||||
// Stop()
|
||||
// }
|
||||
|
||||
type Output interface {
|
||||
// Connect to the Output
|
||||
Connect() error
|
||||
// Close any connections to the Output
|
||||
Close() error
|
||||
// Description returns a one-sentence description on the Output
|
||||
Description() string
|
||||
// SampleConfig returns the default configuration of the Output
|
||||
SampleConfig() string
|
||||
// Write takes in group of points to be written to the Output
|
||||
Write(points []*client.Point) error
|
||||
}
|
||||
|
||||
type ServiceOutput interface {
|
||||
// Connect to the Output
|
||||
Connect() error
|
||||
// Close any connections to the Output
|
||||
Close() error
|
||||
// Description returns a one-sentence description on the Output
|
||||
Description() string
|
||||
// SampleConfig returns the default configuration of the Output
|
||||
SampleConfig() string
|
||||
// Write takes in group of points to be written to the Output
|
||||
Write(points []*client.Point) error
|
||||
// Start the "service" that will provide an Output
|
||||
Start() error
|
||||
// Stop the "service" that will provide an Output
|
||||
Stop()
|
||||
}
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"net"
|
||||
"strconv"
|
||||
|
@ -119,7 +120,7 @@ func (a *Aerospike) Description() string {
|
|||
return "Read stats from an aerospike server"
|
||||
}
|
||||
|
||||
func (a *Aerospike) Gather(acc inputs.Accumulator) error {
|
||||
func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
||||
if len(a.Servers) == 0 {
|
||||
return a.gatherServer("127.0.0.1:3000", acc)
|
||||
}
|
||||
|
@ -140,7 +141,7 @@ func (a *Aerospike) Gather(acc inputs.Accumulator) error {
|
|||
return outerr
|
||||
}
|
||||
|
||||
func (a *Aerospike) gatherServer(host string, acc inputs.Accumulator) error {
|
||||
func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error {
|
||||
aerospikeInfo, err := getMap(STATISTICS_COMMAND, host)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Aerospike info failed: %s", err)
|
||||
|
@ -249,7 +250,7 @@ func get(key []byte, host string) (map[string]string, error) {
|
|||
|
||||
func readAerospikeStats(
|
||||
stats map[string]string,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
host string,
|
||||
namespace string,
|
||||
) {
|
||||
|
@ -336,7 +337,7 @@ func msgLenFromBytes(buf [6]byte) int64 {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("aerospike", func() inputs.Input {
|
||||
inputs.Add("aerospike", func() telegraf.Input {
|
||||
return &Aerospike{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -31,7 +32,7 @@ func (n *Apache) Description() string {
|
|||
return "Read Apache status information (mod_status)"
|
||||
}
|
||||
|
||||
func (n *Apache) Gather(acc inputs.Accumulator) error {
|
||||
func (n *Apache) Gather(acc telegraf.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
var outerr error
|
||||
|
||||
|
@ -59,7 +60,7 @@ var tr = &http.Transport{
|
|||
|
||||
var client = &http.Client{Transport: tr}
|
||||
|
||||
func (n *Apache) gatherUrl(addr *url.URL, acc inputs.Accumulator) error {
|
||||
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||
resp, err := client.Get(addr.String())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err)
|
||||
|
@ -164,7 +165,7 @@ func getTags(addr *url.URL) map[string]string {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("apache", func() inputs.Input {
|
||||
inputs.Add("apache", func() telegraf.Input {
|
||||
return &Apache{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -69,7 +70,7 @@ func prettyToBytes(v string) uint64 {
|
|||
return uint64(result)
|
||||
}
|
||||
|
||||
func (b *Bcache) gatherBcache(bdev string, acc inputs.Accumulator) error {
|
||||
func (b *Bcache) gatherBcache(bdev string, acc telegraf.Accumulator) error {
|
||||
tags := getTags(bdev)
|
||||
metrics, err := filepath.Glob(bdev + "/stats_total/*")
|
||||
if len(metrics) < 0 {
|
||||
|
@ -104,7 +105,7 @@ func (b *Bcache) gatherBcache(bdev string, acc inputs.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *Bcache) Gather(acc inputs.Accumulator) error {
|
||||
func (b *Bcache) Gather(acc telegraf.Accumulator) error {
|
||||
bcacheDevsChecked := make(map[string]bool)
|
||||
var restrictDevs bool
|
||||
if len(b.BcacheDevs) != 0 {
|
||||
|
@ -135,7 +136,7 @@ func (b *Bcache) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("bcache", func() inputs.Input {
|
||||
inputs.Add("bcache", func() telegraf.Input {
|
||||
return &Bcache{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -61,7 +62,7 @@ var ErrProtocolError = errors.New("disque protocol error")
|
|||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (g *Disque) Gather(acc inputs.Accumulator) error {
|
||||
func (g *Disque) Gather(acc telegraf.Accumulator) error {
|
||||
if len(g.Servers) == 0 {
|
||||
url := &url.URL{
|
||||
Host: ":7711",
|
||||
|
@ -98,7 +99,7 @@ func (g *Disque) Gather(acc inputs.Accumulator) error {
|
|||
|
||||
const defaultPort = "7711"
|
||||
|
||||
func (g *Disque) gatherServer(addr *url.URL, acc inputs.Accumulator) error {
|
||||
func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
|
||||
if g.c == nil {
|
||||
|
||||
_, _, err := net.SplitHostPort(addr.Host)
|
||||
|
@ -198,7 +199,7 @@ func (g *Disque) gatherServer(addr *url.URL, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("disque", func() inputs.Input {
|
||||
inputs.Add("disque", func() telegraf.Input {
|
||||
return &Disque{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
|
@ -33,7 +34,7 @@ func (d *Docker) Description() string {
|
|||
|
||||
func (d *Docker) SampleConfig() string { return sampleConfig }
|
||||
|
||||
func (d *Docker) Gather(acc inputs.Accumulator) error {
|
||||
func (d *Docker) Gather(acc telegraf.Accumulator) error {
|
||||
if d.client == nil {
|
||||
var c *docker.Client
|
||||
var err error
|
||||
|
@ -80,7 +81,7 @@ func (d *Docker) Gather(acc inputs.Accumulator) error {
|
|||
|
||||
func (d *Docker) gatherContainer(
|
||||
container docker.APIContainers,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
) error {
|
||||
// Parse container name
|
||||
cname := "unknown"
|
||||
|
@ -129,7 +130,7 @@ func (d *Docker) gatherContainer(
|
|||
|
||||
func gatherContainerStats(
|
||||
stat *docker.Stats,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
) {
|
||||
now := stat.Read
|
||||
|
@ -212,7 +213,7 @@ func gatherContainerStats(
|
|||
|
||||
func gatherBlockIOMetrics(
|
||||
stat *docker.Stats,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
now time.Time,
|
||||
) {
|
||||
|
@ -303,7 +304,7 @@ func sliceContains(in string, sl []string) bool {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("docker", func() inputs.Input {
|
||||
inputs.Add("docker", func() telegraf.Input {
|
||||
return &Docker{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
@ -95,13 +96,13 @@ func (e *Elasticsearch) Description() string {
|
|||
|
||||
// Gather reads the stats from Elasticsearch and writes it to the
|
||||
// Accumulator.
|
||||
func (e *Elasticsearch) Gather(acc inputs.Accumulator) error {
|
||||
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
||||
errChan := make(chan error, len(e.Servers))
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(e.Servers))
|
||||
|
||||
for _, serv := range e.Servers {
|
||||
go func(s string, acc inputs.Accumulator) {
|
||||
go func(s string, acc telegraf.Accumulator) {
|
||||
defer wg.Done()
|
||||
var url string
|
||||
if e.Local {
|
||||
|
@ -133,7 +134,7 @@ func (e *Elasticsearch) Gather(acc inputs.Accumulator) error {
|
|||
return errors.New(strings.Join(errStrings, "\n"))
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) gatherNodeStats(url string, acc inputs.Accumulator) error {
|
||||
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
|
||||
nodeStats := &struct {
|
||||
ClusterName string `json:"cluster_name"`
|
||||
Nodes map[string]*node `json:"nodes"`
|
||||
|
@ -178,7 +179,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc inputs.Accumulator) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) gatherClusterStats(url string, acc inputs.Accumulator) error {
|
||||
func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
|
||||
clusterStats := &clusterHealth{}
|
||||
if err := e.gatherData(url, clusterStats); err != nil {
|
||||
return err
|
||||
|
@ -243,7 +244,7 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("elasticsearch", func() inputs.Input {
|
||||
inputs.Add("elasticsearch", func() telegraf.Input {
|
||||
return NewElasticsearch()
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/gonuts/go-shellquote"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
@ -64,7 +65,7 @@ func (e *Exec) Description() string {
|
|||
return "Read flattened metrics from one or more commands that output JSON to stdout"
|
||||
}
|
||||
|
||||
func (e *Exec) Gather(acc inputs.Accumulator) error {
|
||||
func (e *Exec) Gather(acc telegraf.Accumulator) error {
|
||||
out, err := e.runner.Run(e)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -88,7 +89,7 @@ func (e *Exec) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("exec", func() inputs.Input {
|
||||
inputs.Add("exec", func() telegraf.Input {
|
||||
return NewExec()
|
||||
})
|
||||
}
|
||||
|
|
|
@ -9,11 +9,12 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
func init() {
|
||||
inputs.Add("github_webhooks", func() inputs.Input { return &GithubWebhooks{} })
|
||||
inputs.Add("github_webhooks", func() telegraf.Input { return &GithubWebhooks{} })
|
||||
}
|
||||
|
||||
type GithubWebhooks struct {
|
||||
|
@ -40,7 +41,7 @@ func (gh *GithubWebhooks) Description() string {
|
|||
}
|
||||
|
||||
// Writes the points from <-gh.in to the Accumulator
|
||||
func (gh *GithubWebhooks) Gather(acc inputs.Accumulator) error {
|
||||
func (gh *GithubWebhooks) Gather(acc telegraf.Accumulator) error {
|
||||
gh.Lock()
|
||||
defer gh.Unlock()
|
||||
for _, event := range gh.events {
|
||||
|
|
|
@ -3,6 +3,7 @@ package haproxy
|
|||
import (
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"io"
|
||||
"net/http"
|
||||
|
@ -104,7 +105,7 @@ func (r *haproxy) Description() string {
|
|||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (g *haproxy) Gather(acc inputs.Accumulator) error {
|
||||
func (g *haproxy) Gather(acc telegraf.Accumulator) error {
|
||||
if len(g.Servers) == 0 {
|
||||
return g.gatherServer("http://127.0.0.1:1936", acc)
|
||||
}
|
||||
|
@ -126,7 +127,7 @@ func (g *haproxy) Gather(acc inputs.Accumulator) error {
|
|||
return outerr
|
||||
}
|
||||
|
||||
func (g *haproxy) gatherServer(addr string, acc inputs.Accumulator) error {
|
||||
func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||
if g.client == nil {
|
||||
|
||||
client := &http.Client{}
|
||||
|
@ -156,7 +157,7 @@ func (g *haproxy) gatherServer(addr string, acc inputs.Accumulator) error {
|
|||
return importCsvResult(res.Body, acc, u.Host)
|
||||
}
|
||||
|
||||
func importCsvResult(r io.Reader, acc inputs.Accumulator, host string) error {
|
||||
func importCsvResult(r io.Reader, acc telegraf.Accumulator, host string) error {
|
||||
csv := csv.NewReader(r)
|
||||
result, err := csv.ReadAll()
|
||||
now := time.Now()
|
||||
|
@ -358,7 +359,7 @@ func importCsvResult(r io.Reader, acc inputs.Accumulator, host string) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("haproxy", func() inputs.Input {
|
||||
inputs.Add("haproxy", func() telegraf.Input {
|
||||
return &haproxy{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
@ -88,7 +89,7 @@ func (h *HttpJson) Description() string {
|
|||
}
|
||||
|
||||
// Gathers data for all servers.
|
||||
func (h *HttpJson) Gather(acc inputs.Accumulator) error {
|
||||
func (h *HttpJson) Gather(acc telegraf.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
errorChannel := make(chan error, len(h.Servers))
|
||||
|
@ -127,7 +128,7 @@ func (h *HttpJson) Gather(acc inputs.Accumulator) error {
|
|||
// Returns:
|
||||
// error: Any error that may have occurred
|
||||
func (h *HttpJson) gatherServer(
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
serverURL string,
|
||||
) error {
|
||||
resp, responseTime, err := h.sendRequest(serverURL)
|
||||
|
@ -232,7 +233,7 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("httpjson", func() inputs.Input {
|
||||
inputs.Add("httpjson", func() telegraf.Input {
|
||||
return &HttpJson{client: RealHTTPClient{client: &http.Client{}}}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -32,7 +33,7 @@ func (*InfluxDB) SampleConfig() string {
|
|||
`
|
||||
}
|
||||
|
||||
func (i *InfluxDB) Gather(acc inputs.Accumulator) error {
|
||||
func (i *InfluxDB) Gather(acc telegraf.Accumulator) error {
|
||||
errorChannel := make(chan error, len(i.URLs))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
@ -77,7 +78,7 @@ type point struct {
|
|||
// Returns:
|
||||
// error: Any error that may have occurred
|
||||
func (i *InfluxDB) gatherURL(
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
url string,
|
||||
) error {
|
||||
resp, err := http.Get(url)
|
||||
|
@ -140,7 +141,7 @@ func (i *InfluxDB) gatherURL(
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("influxdb", func() inputs.Input {
|
||||
inputs.Add("influxdb", func() telegraf.Input {
|
||||
return &InfluxDB{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -108,7 +109,7 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
|
|||
return jsonOut, nil
|
||||
}
|
||||
|
||||
func (j *Jolokia) Gather(acc inputs.Accumulator) error {
|
||||
func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
|
||||
context := j.Context //"/jolokia/read"
|
||||
servers := j.Servers
|
||||
metrics := j.Metrics
|
||||
|
@ -157,7 +158,7 @@ func (j *Jolokia) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("jolokia", func() inputs.Input {
|
||||
inputs.Add("jolokia", func() telegraf.Input {
|
||||
return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
|
@ -148,7 +149,7 @@ func (k *Kafka) Stop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (k *Kafka) Gather(acc inputs.Accumulator) error {
|
||||
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
|
||||
k.Lock()
|
||||
defer k.Unlock()
|
||||
npoints := len(k.pointChan)
|
||||
|
@ -160,7 +161,7 @@ func (k *Kafka) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("kafka_consumer", func() inputs.Input {
|
||||
inputs.Add("kafka_consumer", func() telegraf.Input {
|
||||
return &Kafka{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package leofs
|
|||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"net/url"
|
||||
"os/exec"
|
||||
|
@ -146,7 +147,7 @@ func (l *LeoFS) Description() string {
|
|||
return "Read metrics from a LeoFS Server via SNMP"
|
||||
}
|
||||
|
||||
func (l *LeoFS) Gather(acc inputs.Accumulator) error {
|
||||
func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
|
||||
if len(l.Servers) == 0 {
|
||||
l.gatherServer(defaultEndpoint, ServerTypeManagerMaster, acc)
|
||||
return nil
|
||||
|
@ -176,7 +177,7 @@ func (l *LeoFS) Gather(acc inputs.Accumulator) error {
|
|||
return outerr
|
||||
}
|
||||
|
||||
func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc inputs.Accumulator) error {
|
||||
func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc telegraf.Accumulator) error {
|
||||
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
|
@ -225,7 +226,7 @@ func retrieveTokenAfterColon(line string) (string, error) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("leofs", func() inputs.Input {
|
||||
inputs.Add("leofs", func() telegraf.Input {
|
||||
return &LeoFS{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
@ -129,7 +130,7 @@ var wanted_mds_fields = []*mapping{
|
|||
},
|
||||
}
|
||||
|
||||
func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc inputs.Accumulator) error {
|
||||
func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc telegraf.Accumulator) error {
|
||||
files, err := filepath.Glob(fileglob)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -193,7 +194,7 @@ func (l *Lustre2) Description() string {
|
|||
}
|
||||
|
||||
// Gather reads stats from all lustre targets
|
||||
func (l *Lustre2) Gather(acc inputs.Accumulator) error {
|
||||
func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
|
||||
l.allFields = make(map[string]map[string]interface{})
|
||||
|
||||
if len(l.Ost_procfiles) == 0 {
|
||||
|
@ -244,7 +245,7 @@ func (l *Lustre2) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("lustre2", func() inputs.Input {
|
||||
inputs.Add("lustre2", func() telegraf.Input {
|
||||
return &Lustre2{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -34,7 +35,7 @@ func (m *MailChimp) Description() string {
|
|||
return "Gathers metrics from the /3.0/reports MailChimp API"
|
||||
}
|
||||
|
||||
func (m *MailChimp) Gather(acc inputs.Accumulator) error {
|
||||
func (m *MailChimp) Gather(acc telegraf.Accumulator) error {
|
||||
if m.api == nil {
|
||||
m.api = NewChimpAPI(m.ApiKey)
|
||||
}
|
||||
|
@ -71,7 +72,7 @@ func (m *MailChimp) Gather(acc inputs.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func gatherReport(acc inputs.Accumulator, report Report, now time.Time) {
|
||||
func gatherReport(acc telegraf.Accumulator, report Report, now time.Time) {
|
||||
tags := make(map[string]string)
|
||||
tags["id"] = report.ID
|
||||
tags["campaign_title"] = report.CampaignTitle
|
||||
|
@ -110,7 +111,7 @@ func gatherReport(acc inputs.Accumulator, report Report, now time.Time) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("mailchimp", func() inputs.Input {
|
||||
inputs.Add("mailchimp", func() telegraf.Input {
|
||||
return &MailChimp{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -69,7 +70,7 @@ func (m *Memcached) Description() string {
|
|||
}
|
||||
|
||||
// Gather reads stats from all configured servers accumulates stats
|
||||
func (m *Memcached) Gather(acc inputs.Accumulator) error {
|
||||
func (m *Memcached) Gather(acc telegraf.Accumulator) error {
|
||||
if len(m.Servers) == 0 && len(m.UnixSockets) == 0 {
|
||||
return m.gatherServer(":11211", false, acc)
|
||||
}
|
||||
|
@ -92,7 +93,7 @@ func (m *Memcached) Gather(acc inputs.Accumulator) error {
|
|||
func (m *Memcached) gatherServer(
|
||||
address string,
|
||||
unix bool,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
) error {
|
||||
var conn net.Conn
|
||||
if unix {
|
||||
|
@ -178,7 +179,7 @@ func parseResponse(r *bufio.Reader) (map[string]string, error) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("memcached", func() inputs.Input {
|
||||
inputs.Add("memcached", func() telegraf.Input {
|
||||
return &Memcached{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,12 +1,16 @@
|
|||
package inputs
|
||||
|
||||
import "github.com/stretchr/testify/mock"
|
||||
import (
|
||||
"github.com/influxdata/telegraf"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type MockPlugin struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockPlugin) Gather(_a0 Accumulator) error {
|
||||
func (m *MockPlugin) Gather(_a0 telegraf.Accumulator) error {
|
||||
ret := m.Called(_a0)
|
||||
|
||||
r0 := ret.Error(0)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"gopkg.in/mgo.v2"
|
||||
)
|
||||
|
@ -45,7 +46,7 @@ var localhost = &url.URL{Host: "127.0.0.1:27017"}
|
|||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (m *MongoDB) Gather(acc inputs.Accumulator) error {
|
||||
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
|
||||
if len(m.Servers) == 0 {
|
||||
m.gatherServer(m.getMongoServer(localhost), acc)
|
||||
return nil
|
||||
|
@ -88,7 +89,7 @@ func (m *MongoDB) getMongoServer(url *url.URL) *Server {
|
|||
return m.mongos[url.Host]
|
||||
}
|
||||
|
||||
func (m *MongoDB) gatherServer(server *Server, acc inputs.Accumulator) error {
|
||||
func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
|
||||
if server.Session == nil {
|
||||
var dialAddrs []string
|
||||
if server.Url.User != nil {
|
||||
|
@ -138,7 +139,7 @@ func (m *MongoDB) gatherServer(server *Server, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("mongodb", func() inputs.Input {
|
||||
inputs.Add("mongodb", func() telegraf.Input {
|
||||
return &MongoDB{
|
||||
mongos: make(map[string]*Server),
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
"reflect"
|
||||
"strconv"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type MongodbData struct {
|
||||
|
@ -97,7 +97,7 @@ func (d *MongodbData) add(key string, val interface{}) {
|
|||
d.Fields[key] = val
|
||||
}
|
||||
|
||||
func (d *MongodbData) flush(acc inputs.Accumulator) {
|
||||
func (d *MongodbData) flush(acc telegraf.Accumulator) {
|
||||
acc.AddFields(
|
||||
"mongodb",
|
||||
d.Fields,
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
"gopkg.in/mgo.v2"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
@ -21,7 +21,7 @@ func (s *Server) getDefaultTags() map[string]string {
|
|||
return tags
|
||||
}
|
||||
|
||||
func (s *Server) gatherData(acc inputs.Accumulator) error {
|
||||
func (s *Server) gatherData(acc telegraf.Accumulator) error {
|
||||
s.Session.SetMode(mgo.Eventual, true)
|
||||
s.Session.SetSocketTimeout(0)
|
||||
result := &ServerStatus{}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"strings"
|
||||
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -35,7 +36,7 @@ func (m *Mysql) Description() string {
|
|||
|
||||
var localhost = ""
|
||||
|
||||
func (m *Mysql) Gather(acc inputs.Accumulator) error {
|
||||
func (m *Mysql) Gather(acc telegraf.Accumulator) error {
|
||||
if len(m.Servers) == 0 {
|
||||
// if we can't get stats in this case, thats fine, don't report
|
||||
// an error.
|
||||
|
@ -113,7 +114,7 @@ var mappings = []*mapping{
|
|||
},
|
||||
}
|
||||
|
||||
func (m *Mysql) gatherServer(serv string, acc inputs.Accumulator) error {
|
||||
func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
|
||||
// If user forgot the '/', add it
|
||||
if strings.HasSuffix(serv, ")") {
|
||||
serv = serv + "/"
|
||||
|
@ -207,7 +208,7 @@ func (m *Mysql) gatherServer(serv string, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("mysql", func() inputs.Input {
|
||||
inputs.Add("mysql", func() telegraf.Input {
|
||||
return &Mysql{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -31,7 +32,7 @@ func (n *Nginx) Description() string {
|
|||
return "Read Nginx's basic status information (ngx_http_stub_status_module)"
|
||||
}
|
||||
|
||||
func (n *Nginx) Gather(acc inputs.Accumulator) error {
|
||||
func (n *Nginx) Gather(acc telegraf.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
var outerr error
|
||||
|
||||
|
@ -59,7 +60,7 @@ var tr = &http.Transport{
|
|||
|
||||
var client = &http.Client{Transport: tr}
|
||||
|
||||
func (n *Nginx) gatherUrl(addr *url.URL, acc inputs.Accumulator) error {
|
||||
func (n *Nginx) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||
resp, err := client.Get(addr.String())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err)
|
||||
|
@ -159,7 +160,7 @@ func getTags(addr *url.URL) map[string]string {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("nginx", func() inputs.Input {
|
||||
inputs.Add("nginx", func() telegraf.Input {
|
||||
return &Nginx{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -49,7 +50,7 @@ const (
|
|||
)
|
||||
|
||||
func init() {
|
||||
inputs.Add("nsq", func() inputs.Input {
|
||||
inputs.Add("nsq", func() telegraf.Input {
|
||||
return &NSQ{}
|
||||
})
|
||||
}
|
||||
|
@ -62,7 +63,7 @@ func (n *NSQ) Description() string {
|
|||
return "Read NSQ topic and channel statistics."
|
||||
}
|
||||
|
||||
func (n *NSQ) Gather(acc inputs.Accumulator) error {
|
||||
func (n *NSQ) Gather(acc telegraf.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
var outerr error
|
||||
|
||||
|
@ -85,7 +86,7 @@ var tr = &http.Transport{
|
|||
|
||||
var client = &http.Client{Transport: tr}
|
||||
|
||||
func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error {
|
||||
func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
|
||||
u, err := buildURL(e)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -136,7 +137,7 @@ func buildURL(e string) (*url.URL, error) {
|
|||
return addr, nil
|
||||
}
|
||||
|
||||
func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) {
|
||||
func topicStats(t TopicStats, acc telegraf.Accumulator, host, version string) {
|
||||
// per topic overall (tag: name, paused, channel count)
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
|
@ -157,7 +158,7 @@ func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) {
|
|||
}
|
||||
}
|
||||
|
||||
func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) {
|
||||
func channelStats(c ChannelStats, acc telegraf.Accumulator, host, version, topic string) {
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": version,
|
||||
|
@ -182,7 +183,7 @@ func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic s
|
|||
}
|
||||
}
|
||||
|
||||
func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) {
|
||||
func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic, channel string) {
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": version,
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"golang.org/x/net/html/charset"
|
||||
)
|
||||
|
@ -145,7 +146,7 @@ func (r *passenger) Description() string {
|
|||
return "Read metrics of passenger using passenger-status"
|
||||
}
|
||||
|
||||
func (g *passenger) Gather(acc inputs.Accumulator) error {
|
||||
func (g *passenger) Gather(acc telegraf.Accumulator) error {
|
||||
if g.Command == "" {
|
||||
g.Command = "passenger-status -v --show=xml"
|
||||
}
|
||||
|
@ -164,7 +165,7 @@ func (g *passenger) Gather(acc inputs.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func importMetric(stat []byte, acc inputs.Accumulator) error {
|
||||
func importMetric(stat []byte, acc telegraf.Accumulator) error {
|
||||
var p info
|
||||
|
||||
decoder := xml.NewDecoder(bytes.NewReader(stat))
|
||||
|
@ -244,7 +245,7 @@ func importMetric(stat []byte, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("passenger", func() inputs.Input {
|
||||
inputs.Add("passenger", func() telegraf.Input {
|
||||
return &passenger{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -73,7 +74,7 @@ func (r *phpfpm) Description() string {
|
|||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (g *phpfpm) Gather(acc inputs.Accumulator) error {
|
||||
func (g *phpfpm) Gather(acc telegraf.Accumulator) error {
|
||||
if len(g.Urls) == 0 {
|
||||
return g.gatherServer("http://127.0.0.1/status", acc)
|
||||
}
|
||||
|
@ -96,7 +97,7 @@ func (g *phpfpm) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
// Request status page to get stat raw data and import it
|
||||
func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error {
|
||||
func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||
if g.client == nil {
|
||||
client := &http.Client{}
|
||||
g.client = client
|
||||
|
@ -140,7 +141,7 @@ func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
// Gather stat using fcgi protocol
|
||||
func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc inputs.Accumulator) error {
|
||||
func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumulator) error {
|
||||
fpmOutput, fpmErr, err := fcgi.Request(map[string]string{
|
||||
"SCRIPT_NAME": "/" + statusPath,
|
||||
"SCRIPT_FILENAME": statusPath,
|
||||
|
@ -160,7 +161,7 @@ func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc inputs.Accumulato
|
|||
}
|
||||
|
||||
// Gather stat using http protocol
|
||||
func (g *phpfpm) gatherHttp(addr string, acc inputs.Accumulator) error {
|
||||
func (g *phpfpm) gatherHttp(addr string, acc telegraf.Accumulator) error {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
|
||||
|
@ -184,7 +185,7 @@ func (g *phpfpm) gatherHttp(addr string, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
// Import stat data into Telegraf system
|
||||
func importMetric(r io.Reader, acc inputs.Accumulator) (poolStat, error) {
|
||||
func importMetric(r io.Reader, acc telegraf.Accumulator) (poolStat, error) {
|
||||
stats := make(poolStat)
|
||||
var currentPool string
|
||||
|
||||
|
@ -239,7 +240,7 @@ func importMetric(r io.Reader, acc inputs.Accumulator) (poolStat, error) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("phpfpm", func() inputs.Input {
|
||||
inputs.Add("phpfpm", func() telegraf.Input {
|
||||
return &phpfpm{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -56,7 +57,7 @@ func (_ *Ping) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (p *Ping) Gather(acc inputs.Accumulator) error {
|
||||
func (p *Ping) Gather(acc telegraf.Accumulator) error {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
errorChannel := make(chan error, len(p.Urls)*2)
|
||||
|
@ -64,7 +65,7 @@ func (p *Ping) Gather(acc inputs.Accumulator) error {
|
|||
// Spin off a go routine for each url to ping
|
||||
for _, url := range p.Urls {
|
||||
wg.Add(1)
|
||||
go func(url string, acc inputs.Accumulator) {
|
||||
go func(url string, acc telegraf.Accumulator) {
|
||||
defer wg.Done()
|
||||
args := p.args(url)
|
||||
out, err := p.pingHost(args...)
|
||||
|
@ -176,7 +177,7 @@ func processPingOutput(out string) (int, int, float64, error) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("ping", func() inputs.Input {
|
||||
inputs.Add("ping", func() telegraf.Input {
|
||||
return &Ping{pingHost: hostPinger}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
|
@ -53,7 +54,7 @@ func (p *Postgresql) IgnoredColumns() map[string]bool {
|
|||
|
||||
var localhost = "host=localhost sslmode=disable"
|
||||
|
||||
func (p *Postgresql) Gather(acc inputs.Accumulator) error {
|
||||
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
|
||||
var query string
|
||||
|
||||
if p.Address == "" || p.Address == "localhost" {
|
||||
|
@ -101,7 +102,7 @@ type scanner interface {
|
|||
Scan(dest ...interface{}) error
|
||||
}
|
||||
|
||||
func (p *Postgresql) accRow(row scanner, acc inputs.Accumulator) error {
|
||||
func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
|
||||
var columnVars []interface{}
|
||||
var dbname bytes.Buffer
|
||||
|
||||
|
@ -145,7 +146,7 @@ func (p *Postgresql) accRow(row scanner, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("postgresql", func() inputs.Input {
|
||||
inputs.Add("postgresql", func() telegraf.Input {
|
||||
return &Postgresql{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/shirou/gopsutil/process"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -49,7 +50,7 @@ func (_ *Procstat) Description() string {
|
|||
return "Monitor process cpu and memory usage"
|
||||
}
|
||||
|
||||
func (p *Procstat) Gather(acc inputs.Accumulator) error {
|
||||
func (p *Procstat) Gather(acc telegraf.Accumulator) error {
|
||||
err := p.createProcesses()
|
||||
if err != nil {
|
||||
log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] %s",
|
||||
|
@ -175,7 +176,7 @@ func pidsFromPattern(pattern string) ([]int32, error) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("procstat", func() inputs.Input {
|
||||
inputs.Add("procstat", func() telegraf.Input {
|
||||
return NewProcstat()
|
||||
})
|
||||
}
|
||||
|
|
|
@ -6,14 +6,14 @@ import (
|
|||
|
||||
"github.com/shirou/gopsutil/process"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type SpecProcessor struct {
|
||||
Prefix string
|
||||
tags map[string]string
|
||||
fields map[string]interface{}
|
||||
acc inputs.Accumulator
|
||||
acc telegraf.Accumulator
|
||||
proc *process.Process
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ func (p *SpecProcessor) flush() {
|
|||
|
||||
func NewSpecProcessor(
|
||||
prefix string,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
p *process.Process,
|
||||
) *SpecProcessor {
|
||||
tags := make(map[string]string)
|
||||
|
|
|
@ -3,6 +3,7 @@ package prometheus
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -32,7 +33,7 @@ var ErrProtocolError = errors.New("prometheus protocol error")
|
|||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (g *Prometheus) Gather(acc inputs.Accumulator) error {
|
||||
func (g *Prometheus) Gather(acc telegraf.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
var outerr error
|
||||
|
@ -50,7 +51,7 @@ func (g *Prometheus) Gather(acc inputs.Accumulator) error {
|
|||
return outerr
|
||||
}
|
||||
|
||||
func (g *Prometheus) gatherURL(url string, acc inputs.Accumulator) error {
|
||||
func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
|
||||
|
@ -97,7 +98,7 @@ func (g *Prometheus) gatherURL(url string, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("prometheus", func() inputs.Input {
|
||||
inputs.Add("prometheus", func() telegraf.Input {
|
||||
return &Prometheus{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -82,7 +83,7 @@ func (pa *PuppetAgent) Description() string {
|
|||
}
|
||||
|
||||
// Gather reads stats from all configured servers accumulates stats
|
||||
func (pa *PuppetAgent) Gather(acc inputs.Accumulator) error {
|
||||
func (pa *PuppetAgent) Gather(acc telegraf.Accumulator) error {
|
||||
|
||||
if len(pa.Location) == 0 {
|
||||
pa.Location = "/var/lib/puppet/state/last_run_summary.yaml"
|
||||
|
@ -110,7 +111,7 @@ func (pa *PuppetAgent) Gather(acc inputs.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func structPrinter(s *State, acc inputs.Accumulator, tags map[string]string) {
|
||||
func structPrinter(s *State, acc telegraf.Accumulator, tags map[string]string) {
|
||||
e := reflect.ValueOf(s).Elem()
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
|
@ -131,7 +132,7 @@ func structPrinter(s *State, acc inputs.Accumulator, tags map[string]string) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("puppetagent", func() inputs.Input {
|
||||
inputs.Add("puppetagent", func() telegraf.Input {
|
||||
return &PuppetAgent{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -96,7 +97,7 @@ type Node struct {
|
|||
SocketsUsed int64 `json:"sockets_used"`
|
||||
}
|
||||
|
||||
type gatherFunc func(r *RabbitMQ, acc inputs.Accumulator, errChan chan error)
|
||||
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error)
|
||||
|
||||
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
||||
|
||||
|
@ -119,7 +120,7 @@ func (r *RabbitMQ) Description() string {
|
|||
return "Read metrics from one or many RabbitMQ servers via the management API"
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Gather(acc inputs.Accumulator) error {
|
||||
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
|
||||
if r.Client == nil {
|
||||
r.Client = &http.Client{}
|
||||
}
|
||||
|
@ -172,7 +173,7 @@ func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func gatherOverview(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) {
|
||||
func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
|
||||
overview := &OverviewResponse{}
|
||||
|
||||
err := r.requestJSON("/api/overview", &overview)
|
||||
|
@ -208,7 +209,7 @@ func gatherOverview(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) {
|
|||
errChan <- nil
|
||||
}
|
||||
|
||||
func gatherNodes(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) {
|
||||
func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
|
||||
nodes := make([]Node, 0)
|
||||
// Gather information about nodes
|
||||
err := r.requestJSON("/api/nodes", &nodes)
|
||||
|
@ -245,7 +246,7 @@ func gatherNodes(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) {
|
|||
errChan <- nil
|
||||
}
|
||||
|
||||
func gatherQueues(r *RabbitMQ, acc inputs.Accumulator, errChan chan error) {
|
||||
func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
|
||||
// Gather information about queues
|
||||
queues := make([]Queue, 0)
|
||||
err := r.requestJSON("/api/queues", &queues)
|
||||
|
@ -330,7 +331,7 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("rabbitmq", func() inputs.Input {
|
||||
inputs.Add("rabbitmq", func() telegraf.Input {
|
||||
return &RabbitMQ{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -76,7 +77,7 @@ var ErrProtocolError = errors.New("redis protocol error")
|
|||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (r *Redis) Gather(acc inputs.Accumulator) error {
|
||||
func (r *Redis) Gather(acc telegraf.Accumulator) error {
|
||||
if len(r.Servers) == 0 {
|
||||
url := &url.URL{
|
||||
Host: ":6379",
|
||||
|
@ -113,7 +114,7 @@ func (r *Redis) Gather(acc inputs.Accumulator) error {
|
|||
|
||||
const defaultPort = "6379"
|
||||
|
||||
func (r *Redis) gatherServer(addr *url.URL, acc inputs.Accumulator) error {
|
||||
func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
|
||||
_, _, err := net.SplitHostPort(addr.Host)
|
||||
if err != nil {
|
||||
addr.Host = addr.Host + ":" + defaultPort
|
||||
|
@ -158,7 +159,7 @@ func (r *Redis) gatherServer(addr *url.URL, acc inputs.Accumulator) error {
|
|||
// gatherInfoOutput gathers
|
||||
func gatherInfoOutput(
|
||||
rdr *bufio.Reader,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
) error {
|
||||
var keyspace_hits, keyspace_misses uint64 = 0, 0
|
||||
|
@ -227,7 +228,7 @@ func gatherInfoOutput(
|
|||
func gatherKeyspaceLine(
|
||||
name string,
|
||||
line string,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
) {
|
||||
if strings.Contains(line, "keys=") {
|
||||
|
@ -246,7 +247,7 @@ func gatherKeyspaceLine(
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("redis", func() inputs.Input {
|
||||
inputs.Add("redis", func() telegraf.Input {
|
||||
return &Redis{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,53 +1,8 @@
|
|||
package inputs
|
||||
|
||||
import "time"
|
||||
import "github.com/influxdata/telegraf"
|
||||
|
||||
type Accumulator interface {
|
||||
// Create a point with a value, decorating it with tags
|
||||
// NOTE: tags is expected to be owned by the caller, don't mutate
|
||||
// it after passing to Add.
|
||||
Add(measurement string,
|
||||
value interface{},
|
||||
tags map[string]string,
|
||||
t ...time.Time)
|
||||
|
||||
AddFields(measurement string,
|
||||
fields map[string]interface{},
|
||||
tags map[string]string,
|
||||
t ...time.Time)
|
||||
}
|
||||
|
||||
type Input interface {
|
||||
// SampleConfig returns the default configuration of the Input
|
||||
SampleConfig() string
|
||||
|
||||
// Description returns a one-sentence description on the Input
|
||||
Description() string
|
||||
|
||||
// Gather takes in an accumulator and adds the metrics that the Input
|
||||
// gathers. This is called every "interval"
|
||||
Gather(Accumulator) error
|
||||
}
|
||||
|
||||
type ServiceInput interface {
|
||||
// SampleConfig returns the default configuration of the Input
|
||||
SampleConfig() string
|
||||
|
||||
// Description returns a one-sentence description on the Input
|
||||
Description() string
|
||||
|
||||
// Gather takes in an accumulator and adds the metrics that the Input
|
||||
// gathers. This is called every "interval"
|
||||
Gather(Accumulator) error
|
||||
|
||||
// Start starts the ServiceInput's service, whatever that may be
|
||||
Start() error
|
||||
|
||||
// Stop stops the services and closes any necessary channels and connections
|
||||
Stop()
|
||||
}
|
||||
|
||||
type Creator func() Input
|
||||
type Creator func() telegraf.Input
|
||||
|
||||
var Inputs = map[string]Creator{}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"net/url"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
"gopkg.in/dancannon/gorethink.v1"
|
||||
|
@ -35,7 +36,7 @@ var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}}
|
|||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (r *RethinkDB) Gather(acc inputs.Accumulator) error {
|
||||
func (r *RethinkDB) Gather(acc telegraf.Accumulator) error {
|
||||
if len(r.Servers) == 0 {
|
||||
r.gatherServer(localhost, acc)
|
||||
return nil
|
||||
|
@ -65,7 +66,7 @@ func (r *RethinkDB) Gather(acc inputs.Accumulator) error {
|
|||
return outerr
|
||||
}
|
||||
|
||||
func (r *RethinkDB) gatherServer(server *Server, acc inputs.Accumulator) error {
|
||||
func (r *RethinkDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
|
||||
var err error
|
||||
connectOpts := gorethink.ConnectOpts{
|
||||
Address: server.Url.Host,
|
||||
|
@ -87,7 +88,7 @@ func (r *RethinkDB) gatherServer(server *Server, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("rethinkdb", func() inputs.Input {
|
||||
inputs.Add("rethinkdb", func() telegraf.Input {
|
||||
return &RethinkDB{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type serverStatus struct {
|
||||
|
@ -88,7 +88,7 @@ var engineStats = map[string]string{
|
|||
|
||||
func (e *Engine) AddEngineStats(
|
||||
keys []string,
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
) {
|
||||
engine := reflect.ValueOf(e).Elem()
|
||||
|
@ -99,7 +99,7 @@ func (e *Engine) AddEngineStats(
|
|||
acc.AddFields("rethinkdb_engine", fields, tags)
|
||||
}
|
||||
|
||||
func (s *Storage) AddStats(acc inputs.Accumulator, tags map[string]string) {
|
||||
func (s *Storage) AddStats(acc telegraf.Accumulator, tags map[string]string) {
|
||||
fields := map[string]interface{}{
|
||||
"cache_bytes_in_use": s.Cache.BytesInUse,
|
||||
"disk_read_bytes_per_sec": s.Disk.ReadBytesPerSec,
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
|
||||
"gopkg.in/dancannon/gorethink.v1"
|
||||
)
|
||||
|
@ -20,7 +20,7 @@ type Server struct {
|
|||
serverStatus serverStatus
|
||||
}
|
||||
|
||||
func (s *Server) gatherData(acc inputs.Accumulator) error {
|
||||
func (s *Server) gatherData(acc telegraf.Accumulator) error {
|
||||
if err := s.getServerStatus(); err != nil {
|
||||
return fmt.Errorf("Failed to get server_status, %s\n", err)
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ var ClusterTracking = []string{
|
|||
"written_docs_per_sec",
|
||||
}
|
||||
|
||||
func (s *Server) addClusterStats(acc inputs.Accumulator) error {
|
||||
func (s *Server) addClusterStats(acc telegraf.Accumulator) error {
|
||||
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster stats query error, %s\n", err.Error())
|
||||
|
@ -138,7 +138,7 @@ var MemberTracking = []string{
|
|||
"total_writes",
|
||||
}
|
||||
|
||||
func (s *Server) addMemberStats(acc inputs.Accumulator) error {
|
||||
func (s *Server) addMemberStats(acc telegraf.Accumulator) error {
|
||||
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.Id}).Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("member stats query error, %s\n", err.Error())
|
||||
|
@ -162,7 +162,7 @@ var TableTracking = []string{
|
|||
"total_writes",
|
||||
}
|
||||
|
||||
func (s *Server) addTableStats(acc inputs.Accumulator) error {
|
||||
func (s *Server) addTableStats(acc telegraf.Accumulator) error {
|
||||
tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session)
|
||||
defer tablesCursor.Close()
|
||||
var tables []tableStatus
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"github.com/md14454/gosensors"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -35,7 +36,7 @@ func (_ *Sensors) SampleConfig() string {
|
|||
return sensorsSampleConfig
|
||||
}
|
||||
|
||||
func (s *Sensors) Gather(acc inputs.Accumulator) error {
|
||||
func (s *Sensors) Gather(acc telegraf.Accumulator) error {
|
||||
gosensors.Init()
|
||||
defer gosensors.Cleanup()
|
||||
|
||||
|
@ -84,7 +85,7 @@ func (s *Sensors) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("sensors", func() inputs.Input {
|
||||
inputs.Add("sensors", func() telegraf.Input {
|
||||
return &Sensors{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
"github.com/soniah/gosnmp"
|
||||
|
@ -187,7 +188,7 @@ func findnodename(node Node, ids []string) (string, string) {
|
|||
return node.name, ""
|
||||
}
|
||||
|
||||
func (s *Snmp) Gather(acc inputs.Accumulator) error {
|
||||
func (s *Snmp) Gather(acc telegraf.Accumulator) error {
|
||||
// Create oid tree
|
||||
if s.SnmptranslateFile != "" && len(initNode.subnodes) == 0 {
|
||||
data, err := ioutil.ReadFile(s.SnmptranslateFile)
|
||||
|
@ -283,7 +284,7 @@ func (s *Snmp) Gather(acc inputs.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *Host) SNMPGet(acc inputs.Accumulator) error {
|
||||
func (h *Host) SNMPGet(acc telegraf.Accumulator) error {
|
||||
// Get snmp client
|
||||
snmpClient, err := h.GetSNMPClient()
|
||||
if err != nil {
|
||||
|
@ -324,7 +325,7 @@ func (h *Host) SNMPGet(acc inputs.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *Host) SNMPBulk(acc inputs.Accumulator) error {
|
||||
func (h *Host) SNMPBulk(acc telegraf.Accumulator) error {
|
||||
// Get snmp client
|
||||
snmpClient, err := h.GetSNMPClient()
|
||||
if err != nil {
|
||||
|
@ -411,7 +412,7 @@ func (h *Host) GetSNMPClient() (*gosnmp.GoSNMP, error) {
|
|||
return snmpClient, nil
|
||||
}
|
||||
|
||||
func (h *Host) HandleResponse(oids map[string]Data, result *gosnmp.SnmpPacket, acc inputs.Accumulator) (string, error) {
|
||||
func (h *Host) HandleResponse(oids map[string]Data, result *gosnmp.SnmpPacket, acc telegraf.Accumulator) (string, error) {
|
||||
var lastOid string
|
||||
for _, variable := range result.Variables {
|
||||
lastOid = variable.Name
|
||||
|
@ -467,7 +468,7 @@ func (h *Host) HandleResponse(oids map[string]Data, result *gosnmp.SnmpPacket, a
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("snmp", func() inputs.Input {
|
||||
inputs.Add("snmp", func() telegraf.Input {
|
||||
return &Snmp{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package sqlserver
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -70,7 +71,7 @@ func initQueries() {
|
|||
}
|
||||
|
||||
// Gather collect data from SQL Server
|
||||
func (s *SQLServer) Gather(acc inputs.Accumulator) error {
|
||||
func (s *SQLServer) Gather(acc telegraf.Accumulator) error {
|
||||
initQueries()
|
||||
|
||||
if len(s.Servers) == 0 {
|
||||
|
@ -94,7 +95,7 @@ func (s *SQLServer) Gather(acc inputs.Accumulator) error {
|
|||
return outerr
|
||||
}
|
||||
|
||||
func (s *SQLServer) gatherServer(server string, query Query, acc inputs.Accumulator) error {
|
||||
func (s *SQLServer) gatherServer(server string, query Query, acc telegraf.Accumulator) error {
|
||||
// deferred opening
|
||||
conn, err := sql.Open("mssql", server)
|
||||
if err != nil {
|
||||
|
@ -130,7 +131,7 @@ func (s *SQLServer) gatherServer(server string, query Query, acc inputs.Accumula
|
|||
return rows.Err()
|
||||
}
|
||||
|
||||
func (s *SQLServer) accRow(query Query, acc inputs.Accumulator, row scanner) error {
|
||||
func (s *SQLServer) accRow(query Query, acc telegraf.Accumulator, row scanner) error {
|
||||
var columnVars []interface{}
|
||||
var fields = make(map[string]interface{})
|
||||
|
||||
|
@ -180,7 +181,7 @@ func (s *SQLServer) accRow(query Query, acc inputs.Accumulator, row scanner) err
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("sqlserver", func() inputs.Input {
|
||||
inputs.Add("sqlserver", func() telegraf.Input {
|
||||
return &SQLServer{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/services/graphite"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -156,7 +157,7 @@ func (_ *Statsd) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (s *Statsd) Gather(acc inputs.Accumulator) error {
|
||||
func (s *Statsd) Gather(acc telegraf.Accumulator) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
|
@ -515,7 +516,7 @@ func (s *Statsd) Stop() {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("statsd", func() inputs.Input {
|
||||
inputs.Add("statsd", func() telegraf.Input {
|
||||
return &Statsd{
|
||||
ConvertNames: true,
|
||||
UDPPacketSize: UDP_PACKET_SIZE,
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/shirou/gopsutil/cpu"
|
||||
)
|
||||
|
@ -39,7 +40,7 @@ func (_ *CPUStats) SampleConfig() string {
|
|||
return sampleConfig
|
||||
}
|
||||
|
||||
func (s *CPUStats) Gather(acc inputs.Accumulator) error {
|
||||
func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
|
||||
times, err := s.ps.CPUTimes(s.PerCPU, s.TotalCPU)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting CPU info: %s", err)
|
||||
|
@ -111,7 +112,7 @@ func totalCpuTime(t cpu.CPUTimesStat) float64 {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("cpu", func() inputs.Input {
|
||||
inputs.Add("cpu", func() telegraf.Input {
|
||||
return &CPUStats{ps: &systemPS{}}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package system
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -29,7 +30,7 @@ func (_ *DiskStats) SampleConfig() string {
|
|||
return diskSampleConfig
|
||||
}
|
||||
|
||||
func (s *DiskStats) Gather(acc inputs.Accumulator) error {
|
||||
func (s *DiskStats) Gather(acc telegraf.Accumulator) error {
|
||||
// Legacy support:
|
||||
if len(s.Mountpoints) != 0 {
|
||||
s.MountPoints = s.Mountpoints
|
||||
|
@ -90,7 +91,7 @@ func (_ *DiskIOStats) SampleConfig() string {
|
|||
return diskIoSampleConfig
|
||||
}
|
||||
|
||||
func (s *DiskIOStats) Gather(acc inputs.Accumulator) error {
|
||||
func (s *DiskIOStats) Gather(acc telegraf.Accumulator) error {
|
||||
diskio, err := s.ps.DiskIO()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting disk io info: %s", err)
|
||||
|
@ -136,11 +137,11 @@ func (s *DiskIOStats) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("disk", func() inputs.Input {
|
||||
inputs.Add("disk", func() telegraf.Input {
|
||||
return &DiskStats{ps: &systemPS{}}
|
||||
})
|
||||
|
||||
inputs.Add("diskio", func() inputs.Input {
|
||||
inputs.Add("diskio", func() telegraf.Input {
|
||||
return &DiskIOStats{ps: &systemPS{}}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package system
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -16,7 +17,7 @@ func (_ *MemStats) Description() string {
|
|||
|
||||
func (_ *MemStats) SampleConfig() string { return "" }
|
||||
|
||||
func (s *MemStats) Gather(acc inputs.Accumulator) error {
|
||||
func (s *MemStats) Gather(acc telegraf.Accumulator) error {
|
||||
vm, err := s.ps.VMStat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting virtual memory info: %s", err)
|
||||
|
@ -47,7 +48,7 @@ func (_ *SwapStats) Description() string {
|
|||
|
||||
func (_ *SwapStats) SampleConfig() string { return "" }
|
||||
|
||||
func (s *SwapStats) Gather(acc inputs.Accumulator) error {
|
||||
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
|
||||
swap, err := s.ps.SwapStat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting swap memory info: %s", err)
|
||||
|
@ -67,11 +68,11 @@ func (s *SwapStats) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("mem", func() inputs.Input {
|
||||
inputs.Add("mem", func() telegraf.Input {
|
||||
return &MemStats{ps: &systemPS{}}
|
||||
})
|
||||
|
||||
inputs.Add("swap", func() inputs.Input {
|
||||
inputs.Add("swap", func() telegraf.Input {
|
||||
return &SwapStats{ps: &systemPS{}}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -31,7 +32,7 @@ func (_ *NetIOStats) SampleConfig() string {
|
|||
return netSampleConfig
|
||||
}
|
||||
|
||||
func (s *NetIOStats) Gather(acc inputs.Accumulator) error {
|
||||
func (s *NetIOStats) Gather(acc telegraf.Accumulator) error {
|
||||
netio, err := s.ps.NetIO()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting net io info: %s", err)
|
||||
|
@ -103,7 +104,7 @@ func (s *NetIOStats) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("net", func() inputs.Input {
|
||||
inputs.Add("net", func() telegraf.Input {
|
||||
return &NetIOStats{ps: &systemPS{}}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"syscall"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -21,7 +22,7 @@ func (_ *NetStats) SampleConfig() string {
|
|||
return tcpstatSampleConfig
|
||||
}
|
||||
|
||||
func (s *NetStats) Gather(acc inputs.Accumulator) error {
|
||||
func (s *NetStats) Gather(acc telegraf.Accumulator) error {
|
||||
netconns, err := s.ps.NetConnections()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting net connections info: %s", err)
|
||||
|
@ -64,7 +65,7 @@ func (s *NetStats) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("netstat", func() inputs.Input {
|
||||
inputs.Add("netstat", func() telegraf.Input {
|
||||
return &NetStats{ps: &systemPS{}}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,8 +3,8 @@ package system
|
|||
import (
|
||||
"os"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
"github.com/shirou/gopsutil/cpu"
|
||||
"github.com/shirou/gopsutil/disk"
|
||||
|
@ -23,7 +23,7 @@ type PS interface {
|
|||
NetConnections() ([]net.NetConnectionStat, error)
|
||||
}
|
||||
|
||||
func add(acc inputs.Accumulator,
|
||||
func add(acc telegraf.Accumulator,
|
||||
name string, val float64, tags map[string]string) {
|
||||
if val >= 0 {
|
||||
acc.Add(name, val, tags)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/shirou/gopsutil/host"
|
||||
"github.com/shirou/gopsutil/load"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -19,7 +20,7 @@ func (_ *SystemStats) Description() string {
|
|||
|
||||
func (_ *SystemStats) SampleConfig() string { return "" }
|
||||
|
||||
func (_ *SystemStats) Gather(acc inputs.Accumulator) error {
|
||||
func (_ *SystemStats) Gather(acc telegraf.Accumulator) error {
|
||||
loadavg, err := load.LoadAvg()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -68,7 +69,7 @@ func format_uptime(uptime uint64) string {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("system", func() inputs.Input {
|
||||
inputs.Add("system", func() telegraf.Input {
|
||||
return &SystemStats{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package trig
|
|||
import (
|
||||
"math"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -24,7 +25,7 @@ func (s *Trig) Description() string {
|
|||
return "Inserts sine and cosine waves for demonstration purposes"
|
||||
}
|
||||
|
||||
func (s *Trig) Gather(acc inputs.Accumulator) error {
|
||||
func (s *Trig) Gather(acc telegraf.Accumulator) error {
|
||||
sinner := math.Sin((s.x*math.Pi)/5.0) * s.Amplitude
|
||||
cosinner := math.Cos((s.x*math.Pi)/5.0) * s.Amplitude
|
||||
|
||||
|
@ -41,5 +42,5 @@ func (s *Trig) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("Trig", func() inputs.Input { return &Trig{x: 0.0} })
|
||||
inputs.Add("Trig", func() telegraf.Input { return &Trig{x: 0.0} })
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -31,7 +32,7 @@ func (t *Twemproxy) Description() string {
|
|||
}
|
||||
|
||||
// Gather data from all Twemproxy instances
|
||||
func (t *Twemproxy) Gather(acc inputs.Accumulator) error {
|
||||
func (t *Twemproxy) Gather(acc telegraf.Accumulator) error {
|
||||
conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -55,7 +56,7 @@ func (t *Twemproxy) Gather(acc inputs.Accumulator) error {
|
|||
|
||||
// Process Twemproxy server stats
|
||||
func (t *Twemproxy) processStat(
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
data map[string]interface{},
|
||||
) {
|
||||
|
@ -89,7 +90,7 @@ func (t *Twemproxy) processStat(
|
|||
|
||||
// Process pool data in Twemproxy stats
|
||||
func (t *Twemproxy) processPool(
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
data map[string]interface{},
|
||||
) {
|
||||
|
@ -117,7 +118,7 @@ func (t *Twemproxy) processPool(
|
|||
|
||||
// Process backend server(redis/memcached) stats
|
||||
func (t *Twemproxy) processServer(
|
||||
acc inputs.Accumulator,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
data map[string]interface{},
|
||||
) {
|
||||
|
@ -143,7 +144,7 @@ func copyTags(tags map[string]string) map[string]string {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("twemproxy", func() inputs.Input {
|
||||
inputs.Add("twemproxy", func() telegraf.Input {
|
||||
return &Twemproxy{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
@ -68,7 +69,7 @@ func getTags(pools []poolInfo) map[string]string {
|
|||
return map[string]string{"pools": poolNames}
|
||||
}
|
||||
|
||||
func gatherPoolStats(pool poolInfo, acc inputs.Accumulator) error {
|
||||
func gatherPoolStats(pool poolInfo, acc telegraf.Accumulator) error {
|
||||
lines, err := internal.ReadLines(pool.ioFilename)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -101,7 +102,7 @@ func gatherPoolStats(pool poolInfo, acc inputs.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (z *Zfs) Gather(acc inputs.Accumulator) error {
|
||||
func (z *Zfs) Gather(acc telegraf.Accumulator) error {
|
||||
kstatMetrics := z.KstatMetrics
|
||||
if len(kstatMetrics) == 0 {
|
||||
kstatMetrics = []string{"arcstats", "zfetchstats", "vdev_cache_stats"}
|
||||
|
@ -149,7 +150,7 @@ func (z *Zfs) Gather(acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("zfs", func() inputs.Input {
|
||||
inputs.Add("zfs", func() telegraf.Input {
|
||||
return &Zfs{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
|
@ -40,7 +41,7 @@ func (z *Zookeeper) Description() string {
|
|||
}
|
||||
|
||||
// Gather reads stats from all configured servers accumulates stats
|
||||
func (z *Zookeeper) Gather(acc inputs.Accumulator) error {
|
||||
func (z *Zookeeper) Gather(acc telegraf.Accumulator) error {
|
||||
if len(z.Servers) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -53,7 +54,7 @@ func (z *Zookeeper) Gather(acc inputs.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (z *Zookeeper) gatherServer(address string, acc inputs.Accumulator) error {
|
||||
func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error {
|
||||
_, _, err := net.SplitHostPort(address)
|
||||
if err != nil {
|
||||
address = address + ":2181"
|
||||
|
@ -103,7 +104,7 @@ func (z *Zookeeper) gatherServer(address string, acc inputs.Accumulator) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("zookeeper", func() inputs.Input {
|
||||
inputs.Add("zookeeper", func() telegraf.Input {
|
||||
return &Zookeeper{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type Amon struct {
|
||||
|
@ -151,7 +152,7 @@ func (a *Amon) Close() error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("amon", func() outputs.Output {
|
||||
outputs.Add("amon", func() telegraf.Output {
|
||||
return &Amon{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
|
@ -190,7 +191,7 @@ func (q *AMQP) Write(points []*client.Point) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("amqp", func() outputs.Output {
|
||||
outputs.Add("amqp", func() telegraf.Output {
|
||||
return &AMQP{
|
||||
Database: DefaultDatabase,
|
||||
Precision: DefaultPrecision,
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type CloudWatch struct {
|
||||
|
@ -230,7 +231,7 @@ func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("cloudwatch", func() outputs.Output {
|
||||
outputs.Add("cloudwatch", func() telegraf.Output {
|
||||
return &CloudWatch{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type Datadog struct {
|
||||
|
@ -173,7 +174,7 @@ func (d *Datadog) Close() error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("datadog", func() outputs.Output {
|
||||
outputs.Add("datadog", func() telegraf.Output {
|
||||
return NewDatadog(datadog_api)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
|
@ -128,7 +129,7 @@ func (g *Graphite) Write(points []*client.Point) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("graphite", func() outputs.Output {
|
||||
outputs.Add("graphite", func() telegraf.Output {
|
||||
return &Graphite{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type InfluxDB struct {
|
||||
|
@ -156,7 +157,7 @@ func (i *InfluxDB) Write(points []*client.Point) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("influxdb", func() outputs.Output {
|
||||
outputs.Add("influxdb", func() telegraf.Output {
|
||||
return &InfluxDB{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/Shopify/sarama"
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
|
@ -140,7 +141,7 @@ func (k *Kafka) Write(points []*client.Point) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("kafka", func() outputs.Output {
|
||||
outputs.Add("kafka", func() telegraf.Output {
|
||||
return &Kafka{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type KinesisOutput struct {
|
||||
|
@ -172,7 +173,7 @@ func (k *KinesisOutput) Write(points []*client.Point) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("kinesis", func() outputs.Output {
|
||||
outputs.Add("kinesis", func() telegraf.Output {
|
||||
return &KinesisOutput{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type Librato struct {
|
||||
|
@ -169,7 +170,7 @@ func (l *Librato) Close() error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("librato", func() outputs.Output {
|
||||
outputs.Add("librato", func() telegraf.Output {
|
||||
return NewLibrato(librato_api)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
const MaxClientIdLen = 8
|
||||
|
@ -184,7 +185,7 @@ func getCertPool(pemPath string) (*x509.CertPool, error) {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("mqtt", func() outputs.Output {
|
||||
outputs.Add("mqtt", func() telegraf.Output {
|
||||
return &MQTT{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/nsqio/go-nsq"
|
||||
)
|
||||
|
||||
|
@ -65,7 +66,7 @@ func (n *NSQ) Write(points []*client.Point) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("nsq", func() outputs.Output {
|
||||
outputs.Add("nsq", func() telegraf.Output {
|
||||
return &NSQ{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type OpenTSDB struct {
|
||||
|
@ -162,7 +163,7 @@ func (o *OpenTSDB) Close() error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("opentsdb", func() outputs.Output {
|
||||
outputs.Add("opentsdb", func() telegraf.Output {
|
||||
return &OpenTSDB{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
|
@ -119,7 +120,7 @@ func (p *PrometheusClient) Write(points []*client.Point) error {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("prometheus_client", func() outputs.Output {
|
||||
outputs.Add("prometheus_client", func() telegraf.Output {
|
||||
return &PrometheusClient{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,40 +1,10 @@
|
|||
package outputs
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type Output interface {
|
||||
// Connect to the Output
|
||||
Connect() error
|
||||
// Close any connections to the Output
|
||||
Close() error
|
||||
// Description returns a one-sentence description on the Output
|
||||
Description() string
|
||||
// SampleConfig returns the default configuration of the Output
|
||||
SampleConfig() string
|
||||
// Write takes in group of points to be written to the Output
|
||||
Write(points []*client.Point) error
|
||||
}
|
||||
|
||||
type ServiceOutput interface {
|
||||
// Connect to the Output
|
||||
Connect() error
|
||||
// Close any connections to the Output
|
||||
Close() error
|
||||
// Description returns a one-sentence description on the Output
|
||||
Description() string
|
||||
// SampleConfig returns the default configuration of the Output
|
||||
SampleConfig() string
|
||||
// Write takes in group of points to be written to the Output
|
||||
Write(points []*client.Point) error
|
||||
// Start the "service" that will provide an Output
|
||||
Start() error
|
||||
// Stop the "service" that will provide an Output
|
||||
Stop()
|
||||
}
|
||||
|
||||
type Creator func() Output
|
||||
type Creator func() telegraf.Output
|
||||
|
||||
var Outputs = map[string]Creator{}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/amir/raidman"
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type Riemann struct {
|
||||
|
@ -95,7 +96,7 @@ func buildEvents(p *client.Point) []*raidman.Event {
|
|||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("riemann", func() outputs.Output {
|
||||
outputs.Add("riemann", func() telegraf.Output {
|
||||
return &Riemann{}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -84,23 +84,6 @@ func (a *Accumulator) AddFields(
|
|||
a.Points = append(a.Points, p)
|
||||
}
|
||||
|
||||
func (a *Accumulator) SetDefaultTags(tags map[string]string) {
|
||||
// stub for implementing Accumulator interface.
|
||||
}
|
||||
|
||||
func (a *Accumulator) AddDefaultTag(key, value string) {
|
||||
// stub for implementing Accumulator interface.
|
||||
}
|
||||
|
||||
func (a *Accumulator) Prefix() string {
|
||||
// stub for implementing Accumulator interface.
|
||||
return ""
|
||||
}
|
||||
|
||||
func (a *Accumulator) SetPrefix(prefix string) {
|
||||
// stub for implementing Accumulator interface.
|
||||
}
|
||||
|
||||
func (a *Accumulator) Debug() bool {
|
||||
// stub for implementing Accumulator interface.
|
||||
return a.debug
|
||||
|
|
Loading…
Reference in New Issue