Implement telegraf collecting stats on itself

closes #1348
This commit is contained in:
Cameron Sparr 2016-11-07 08:34:46 +00:00
parent d518d7d806
commit d71a42cd1b
26 changed files with 975 additions and 169 deletions

View File

@ -10,6 +10,7 @@
- [#2074](https://github.com/influxdata/telegraf/pull/2074): "discard" output plugin added, primarily for testing purposes. - [#2074](https://github.com/influxdata/telegraf/pull/2074): "discard" output plugin added, primarily for testing purposes.
- [#1965](https://github.com/influxdata/telegraf/pull/1965): The JSON parser can now parse an array of objects using the same configuration. - [#1965](https://github.com/influxdata/telegraf/pull/1965): The JSON parser can now parse an array of objects using the same configuration.
- [#1807](https://github.com/influxdata/telegraf/pull/1807): Option to use device name rather than path for reporting disk stats. - [#1807](https://github.com/influxdata/telegraf/pull/1807): Option to use device name rather than path for reporting disk stats.
- [#1348](https://github.com/influxdata/telegraf/issues/1348): Telegraf "internal" plugin for collecting stats on itself.
### Bugfixes ### Bugfixes

View File

@ -159,6 +159,7 @@ configuration options.
* [hddtemp](./plugins/inputs/hddtemp) * [hddtemp](./plugins/inputs/hddtemp)
* [http_response](./plugins/inputs/http_response) * [http_response](./plugins/inputs/http_response)
* [httpjson](./plugins/inputs/httpjson) (generic JSON-emitting http service plugin) * [httpjson](./plugins/inputs/httpjson) (generic JSON-emitting http service plugin)
* [internal](./plugins/inputs/internal)
* [influxdb](./plugins/inputs/influxdb) * [influxdb](./plugins/inputs/influxdb)
* [ipmi_sensor](./plugins/inputs/ipmi_sensor) * [ipmi_sensor](./plugins/inputs/ipmi_sensor)
* [iptables](./plugins/inputs/iptables) * [iptables](./plugins/inputs/iptables)

View File

@ -2,10 +2,14 @@ package agent
import ( import (
"log" "log"
"sync/atomic"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)
var (
NErrors = selfstat.Register("agent", "gather_errors", map[string]string{})
) )
type MetricMaker interface { type MetricMaker interface {
@ -37,8 +41,6 @@ type accumulator struct {
maker MetricMaker maker MetricMaker
precision time.Duration precision time.Duration
errCount uint64
} }
func (ac *accumulator) AddFields( func (ac *accumulator) AddFields(
@ -80,7 +82,7 @@ func (ac *accumulator) AddError(err error) {
if err == nil { if err == nil {
return return
} }
atomic.AddUint64(&ac.errCount, 1) NErrors.Incr(1)
//TODO suppress/throttle consecutive duplicate errors? //TODO suppress/throttle consecutive duplicate errors?
log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err) log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err)
} }

View File

@ -88,7 +88,7 @@ func TestAccAddError(t *testing.T) {
a.AddError(fmt.Errorf("baz")) a.AddError(fmt.Errorf("baz"))
errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, 3, a.errCount) assert.EqualValues(t, int64(3), NErrors.Get())
require.Len(t, errs, 4) // 4 because of trailing newline require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "TestPlugin") assert.Contains(t, string(errs[0]), "TestPlugin")
assert.Contains(t, string(errs[0]), "foo") assert.Contains(t, string(errs[0]), "foo")

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/selfstat"
) )
// Agent runs telegraf and collects data based on the given config // Agent runs telegraf and collects data based on the given config
@ -44,8 +45,6 @@ func NewAgent(config *config.Config) (*Agent, error) {
// Connect connects to all configured outputs // Connect connects to all configured outputs
func (a *Agent) Connect() error { func (a *Agent) Connect() error {
for _, o := range a.Config.Outputs { for _, o := range a.Config.Outputs {
o.Quiet = a.Config.Agent.Quiet
switch ot := o.Output.(type) { switch ot := o.Output.(type) {
case telegraf.ServiceOutput: case telegraf.ServiceOutput:
if err := ot.Start(); err != nil { if err := ot.Start(); err != nil {
@ -106,24 +105,26 @@ func (a *Agent) gatherer(
) { ) {
defer panicRecover(input) defer panicRecover(input)
GatherTime := selfstat.RegisterTiming("gather",
"gather_time_ns",
map[string]string{"input": input.Config.Name},
)
acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
input.SetDebug(a.Config.Agent.Debug)
input.SetDefaultTags(a.Config.Tags)
internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)
start := time.Now() start := time.Now()
gatherWithTimeout(shutdown, input, acc, interval) gatherWithTimeout(shutdown, input, acc, interval)
elapsed := time.Since(start) elapsed := time.Since(start)
log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n", GatherTime.Incr(elapsed.Nanoseconds())
input.Name(), interval, elapsed)
select { select {
case <-shutdown: case <-shutdown:
@ -204,9 +205,6 @@ func (a *Agent) Test() error {
if err := input.Input.Gather(acc); err != nil { if err := input.Input.Gather(acc); err != nil {
return err return err
} }
if acc.errCount > 0 {
return fmt.Errorf("Errors encountered during processing")
}
// Special instructions for some inputs. cpu, for example, needs to be // Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages. // run twice in order to return cpu usage percentages.
@ -327,13 +325,13 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// Start all ServicePlugins // Start all ServicePlugins
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
switch p := input.Input.(type) { switch p := input.Input.(type) {
case telegraf.ServiceInput: case telegraf.ServiceInput:
acc := NewAccumulator(input, metricC) acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their // Service input plugins should set their own precision of their
// metrics. // metrics.
acc.SetPrecision(time.Nanosecond, 0) acc.SetPrecision(time.Nanosecond, 0)
input.SetDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil { if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n", log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name(), err.Error()) input.Name(), err.Error())

View File

@ -4,15 +4,17 @@ import (
"sync" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)
var (
MetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{})
MetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})
) )
// Buffer is an object for storing metrics in a circular buffer. // Buffer is an object for storing metrics in a circular buffer.
type Buffer struct { type Buffer struct {
buf chan telegraf.Metric buf chan telegraf.Metric
// total dropped metrics
drops int
// total metrics added
total int
mu sync.Mutex mu sync.Mutex
} }
@ -36,25 +38,14 @@ func (b *Buffer) Len() int {
return len(b.buf) return len(b.buf)
} }
// Drops returns the total number of dropped metrics that have occured in this
// buffer since instantiation.
func (b *Buffer) Drops() int {
return b.drops
}
// Total returns the total number of metrics that have been added to this buffer.
func (b *Buffer) Total() int {
return b.total
}
// Add adds metrics to the buffer. // Add adds metrics to the buffer.
func (b *Buffer) Add(metrics ...telegraf.Metric) { func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i, _ := range metrics { for i, _ := range metrics {
b.total++ MetricsWritten.Incr(1)
select { select {
case b.buf <- metrics[i]: case b.buf <- metrics[i]:
default: default:
b.drops++ MetricsDropped.Incr(1)
<-b.buf <-b.buf
b.buf <- metrics[i] b.buf <- metrics[i]
} }

View File

@ -27,47 +27,53 @@ func BenchmarkAddMetrics(b *testing.B) {
func TestNewBufferBasicFuncs(t *testing.T) { func TestNewBufferBasicFuncs(t *testing.T) {
b := NewBuffer(10) b := NewBuffer(10)
MetricsDropped.Set(0)
MetricsWritten.Set(0)
assert.True(t, b.IsEmpty()) assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len()) assert.Zero(t, b.Len())
assert.Zero(t, b.Drops()) assert.Zero(t, MetricsDropped.Get())
assert.Zero(t, b.Total()) assert.Zero(t, MetricsWritten.Get())
m := testutil.TestMetric(1, "mymetric") m := testutil.TestMetric(1, "mymetric")
b.Add(m) b.Add(m)
assert.False(t, b.IsEmpty()) assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 1) assert.Equal(t, b.Len(), 1)
assert.Equal(t, b.Drops(), 0) assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, b.Total(), 1) assert.Equal(t, int64(1), MetricsWritten.Get())
b.Add(metricList...) b.Add(metricList...)
assert.False(t, b.IsEmpty()) assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 6) assert.Equal(t, b.Len(), 6)
assert.Equal(t, b.Drops(), 0) assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, b.Total(), 6) assert.Equal(t, int64(6), MetricsWritten.Get())
} }
func TestDroppingMetrics(t *testing.T) { func TestDroppingMetrics(t *testing.T) {
b := NewBuffer(10) b := NewBuffer(10)
MetricsDropped.Set(0)
MetricsWritten.Set(0)
// Add up to the size of the buffer // Add up to the size of the buffer
b.Add(metricList...) b.Add(metricList...)
b.Add(metricList...) b.Add(metricList...)
assert.False(t, b.IsEmpty()) assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10) assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 0) assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, b.Total(), 10) assert.Equal(t, int64(10), MetricsWritten.Get())
// Add 5 more and verify they were dropped // Add 5 more and verify they were dropped
b.Add(metricList...) b.Add(metricList...)
assert.False(t, b.IsEmpty()) assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10) assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 5) assert.Equal(t, int64(5), MetricsDropped.Get())
assert.Equal(t, b.Total(), 15) assert.Equal(t, int64(15), MetricsWritten.Get())
} }
func TestGettingBatches(t *testing.T) { func TestGettingBatches(t *testing.T) {
b := NewBuffer(20) b := NewBuffer(20)
MetricsDropped.Set(0)
MetricsWritten.Set(0)
// Verify that the buffer returned is smaller than requested when there are // Verify that the buffer returned is smaller than requested when there are
// not as many items as requested. // not as many items as requested.
@ -78,8 +84,8 @@ func TestGettingBatches(t *testing.T) {
// Verify that the buffer is now empty // Verify that the buffer is now empty
assert.True(t, b.IsEmpty()) assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len()) assert.Zero(t, b.Len())
assert.Zero(t, b.Drops()) assert.Zero(t, MetricsDropped.Get())
assert.Equal(t, b.Total(), 5) assert.Equal(t, int64(5), MetricsWritten.Get())
// Verify that the buffer returned is not more than the size requested // Verify that the buffer returned is not more than the size requested
b.Add(metricList...) b.Add(metricList...)
@ -89,6 +95,6 @@ func TestGettingBatches(t *testing.T) {
// Verify that buffer is not empty // Verify that buffer is not empty
assert.False(t, b.IsEmpty()) assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 2) assert.Equal(t, b.Len(), 2)
assert.Equal(t, b.Drops(), 0) assert.Equal(t, int64(0), MetricsDropped.Get())
assert.Equal(t, b.Total(), 10) assert.Equal(t, int64(10), MetricsWritten.Get())
} }

View File

@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return err return err
} }
rp := &models.RunningInput{ rp := models.NewRunningInput(input, pluginConfig)
Input: input,
Config: pluginConfig,
}
c.Inputs = append(c.Inputs, rp) c.Inputs = append(c.Inputs, rp)
return nil return nil
} }

View File

@ -32,7 +32,6 @@ func makemetric(
daemonTags map[string]string, daemonTags map[string]string,
filter Filter, filter Filter,
applyFilter bool, applyFilter bool,
debug bool,
mType telegraf.ValueType, mType telegraf.ValueType,
t time.Time, t time.Time,
) telegraf.Metric { ) telegraf.Metric {
@ -123,11 +122,9 @@ func makemetric(
case float64: case float64:
// NaNs are invalid values in influxdb, skip measurement // NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) { if math.IsNaN(val) || math.IsInf(val, 0) {
if debug { log.Printf("D! Measurement [%s] field [%s] has a NaN or Inf "+
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping", "field, skipping",
measurement, k) measurement, k)
}
delete(fields, k) delete(fields, k)
continue continue
} }

View File

@ -66,7 +66,6 @@ func (r *RunningAggregator) MakeMetric(
nil, nil,
r.Config.Filter, r.Config.Filter,
false, false,
false,
mType, mType,
t, t,
) )

View File

@ -5,15 +5,34 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
) )
var GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{})
type RunningInput struct { type RunningInput struct {
Input telegraf.Input Input telegraf.Input
Config *InputConfig Config *InputConfig
trace bool trace bool
debug bool
defaultTags map[string]string defaultTags map[string]string
MetricsGathered selfstat.Stat
}
func NewRunningInput(
input telegraf.Input,
config *InputConfig,
) *RunningInput {
return &RunningInput{
Input: input,
Config: config,
MetricsGathered: selfstat.Register(
"gather",
"metrics_gathered",
map[string]string{"input": config.Name},
),
}
} }
// InputConfig containing a name, interval, and filter // InputConfig containing a name, interval, and filter
@ -51,7 +70,6 @@ func (r *RunningInput) MakeMetric(
r.defaultTags, r.defaultTags,
r.Config.Filter, r.Config.Filter,
true, true,
r.debug,
mType, mType,
t, t,
) )
@ -60,17 +78,11 @@ func (r *RunningInput) MakeMetric(
fmt.Println("> " + m.String()) fmt.Println("> " + m.String())
} }
r.MetricsGathered.Incr(1)
GlobalMetricsGathered.Incr(1)
return m return m
} }
func (r *RunningInput) Debug() bool {
return r.debug
}
func (r *RunningInput) SetDebug(debug bool) {
r.debug = debug
}
func (r *RunningInput) Trace() bool { func (r *RunningInput) Trace() bool {
return r.trace return r.trace
} }

View File

@ -13,11 +13,9 @@ import (
func TestMakeMetricNoFields(t *testing.T) { func TestMakeMetricNoFields(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
}, })
}
m := ri.MakeMetric( m := ri.MakeMetric(
"RITest", "RITest",
@ -32,11 +30,9 @@ func TestMakeMetricNoFields(t *testing.T) {
// nil fields should get dropped // nil fields should get dropped
func TestMakeMetricNilFields(t *testing.T) { func TestMakeMetricNilFields(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
}, })
}
m := ri.MakeMetric( m := ri.MakeMetric(
"RITest", "RITest",
@ -58,13 +54,10 @@ func TestMakeMetricNilFields(t *testing.T) {
// make an untyped, counter, & gauge metric // make an untyped, counter, & gauge metric
func TestMakeMetric(t *testing.T) { func TestMakeMetric(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
}, })
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true) ri.SetTrace(true)
assert.Equal(t, true, ri.Trace()) assert.Equal(t, true, ri.Trace())
assert.Equal(t, "inputs.TestRunningInput", ri.Name()) assert.Equal(t, "inputs.TestRunningInput", ri.Name())
@ -126,16 +119,13 @@ func TestMakeMetric(t *testing.T) {
func TestMakeMetricWithPluginTags(t *testing.T) { func TestMakeMetricWithPluginTags(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
Tags: map[string]string{ Tags: map[string]string{
"foo": "bar", "foo": "bar",
}, },
}, })
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true) ri.SetTrace(true)
assert.Equal(t, true, ri.Trace()) assert.Equal(t, true, ri.Trace())
@ -155,17 +145,14 @@ func TestMakeMetricWithPluginTags(t *testing.T) {
func TestMakeMetricFilteredOut(t *testing.T) { func TestMakeMetricFilteredOut(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
Tags: map[string]string{ Tags: map[string]string{
"foo": "bar", "foo": "bar",
}, },
Filter: Filter{NamePass: []string{"foobar"}}, Filter: Filter{NamePass: []string{"foobar"}},
}, })
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true) ri.SetTrace(true)
assert.Equal(t, true, ri.Trace()) assert.Equal(t, true, ri.Trace())
assert.NoError(t, ri.Config.Filter.Compile()) assert.NoError(t, ri.Config.Filter.Compile())
@ -182,16 +169,13 @@ func TestMakeMetricFilteredOut(t *testing.T) {
func TestMakeMetricWithDaemonTags(t *testing.T) { func TestMakeMetricWithDaemonTags(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
}, })
}
ri.SetDefaultTags(map[string]string{ ri.SetDefaultTags(map[string]string{
"foo": "bar", "foo": "bar",
}) })
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true) ri.SetTrace(true)
assert.Equal(t, true, ri.Trace()) assert.Equal(t, true, ri.Trace())
@ -214,13 +198,10 @@ func TestMakeMetricInfFields(t *testing.T) {
inf := math.Inf(1) inf := math.Inf(1)
ninf := math.Inf(-1) ninf := math.Inf(-1)
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
}, })
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true) ri.SetTrace(true)
assert.Equal(t, true, ri.Trace()) assert.Equal(t, true, ri.Trace())
@ -244,13 +225,10 @@ func TestMakeMetricInfFields(t *testing.T) {
func TestMakeMetricAllFieldTypes(t *testing.T) { func TestMakeMetricAllFieldTypes(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
}, })
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true) ri.SetTrace(true)
assert.Equal(t, true, ri.Trace()) assert.Equal(t, true, ri.Trace())
@ -293,12 +271,10 @@ func TestMakeMetricAllFieldTypes(t *testing.T) {
func TestMakeMetricNameOverride(t *testing.T) { func TestMakeMetricNameOverride(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
NameOverride: "foobar", NameOverride: "foobar",
}, })
}
m := ri.MakeMetric( m := ri.MakeMetric(
"RITest", "RITest",
@ -316,12 +292,10 @@ func TestMakeMetricNameOverride(t *testing.T) {
func TestMakeMetricNamePrefix(t *testing.T) { func TestMakeMetricNamePrefix(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
MeasurementPrefix: "foobar_", MeasurementPrefix: "foobar_",
}, })
}
m := ri.MakeMetric( m := ri.MakeMetric(
"RITest", "RITest",
@ -339,12 +313,10 @@ func TestMakeMetricNamePrefix(t *testing.T) {
func TestMakeMetricNameSuffix(t *testing.T) { func TestMakeMetricNameSuffix(t *testing.T) {
now := time.Now() now := time.Now()
ri := RunningInput{ ri := NewRunningInput(&testInput{}, &InputConfig{
Config: &InputConfig{
Name: "TestRunningInput", Name: "TestRunningInput",
MeasurementSuffix: "_foobar", MeasurementSuffix: "_foobar",
}, })
}
m := ri.MakeMetric( m := ri.MakeMetric(
"RITest", "RITest",
@ -359,3 +331,9 @@ func TestMakeMetricNameSuffix(t *testing.T) {
m.String(), m.String(),
) )
} }
type testInput struct{}
func (t *testInput) Description() string { return "" }
func (t *testInput) SampleConfig() string { return "" }
func (t *testInput) Gather(acc telegraf.Accumulator) error { return nil }

View File

@ -7,6 +7,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/buffer" "github.com/influxdata/telegraf/internal/buffer"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/selfstat"
) )
const ( const (
@ -22,10 +23,15 @@ type RunningOutput struct {
Name string Name string
Output telegraf.Output Output telegraf.Output
Config *OutputConfig Config *OutputConfig
Quiet bool
MetricBufferLimit int MetricBufferLimit int
MetricBatchSize int MetricBatchSize int
MetricsFiltered selfstat.Stat
MetricsWritten selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
WriteTime selfstat.Stat
metrics *buffer.Buffer metrics *buffer.Buffer
failMetrics *buffer.Buffer failMetrics *buffer.Buffer
} }
@ -51,7 +57,33 @@ func NewRunningOutput(
Config: conf, Config: conf,
MetricBufferLimit: bufferLimit, MetricBufferLimit: bufferLimit,
MetricBatchSize: batchSize, MetricBatchSize: batchSize,
MetricsWritten: selfstat.Register(
"write",
"metrics_written",
map[string]string{"output": name},
),
MetricsFiltered: selfstat.Register(
"write",
"metrics_filtered",
map[string]string{"output": name},
),
BufferSize: selfstat.Register(
"write",
"buffer_size",
map[string]string{"output": name},
),
BufferLimit: selfstat.Register(
"write",
"buffer_limit",
map[string]string{"output": name},
),
WriteTime: selfstat.RegisterTiming(
"write",
"write_time_ns",
map[string]string{"output": name},
),
} }
ro.BufferLimit.Incr(int64(ro.MetricBufferLimit))
return ro return ro
} }
@ -67,6 +99,7 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
fields := m.Fields() fields := m.Fields()
t := m.Time() t := m.Time()
if ok := ro.Config.Filter.Apply(name, fields, tags); !ok { if ok := ro.Config.Filter.Apply(name, fields, tags); !ok {
ro.MetricsFiltered.Incr(1)
return return
} }
// error is not possible if creating from another metric, so ignore. // error is not possible if creating from another metric, so ignore.
@ -85,28 +118,21 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
// Write writes all cached points to this output. // Write writes all cached points to this output.
func (ro *RunningOutput) Write() error { func (ro *RunningOutput) Write() error {
if !ro.Quiet { nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+ log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ",
"Total gathered metrics: %d. Total dropped metrics: %d.", ro.Name, nFails+nMetrics, ro.MetricBufferLimit)
ro.Name, ro.BufferSize.Incr(int64(nFails + nMetrics))
ro.failMetrics.Len()+ro.metrics.Len(),
ro.MetricBufferLimit,
ro.metrics.Total(),
ro.metrics.Drops()+ro.failMetrics.Drops())
}
var err error var err error
if !ro.failMetrics.IsEmpty() { if !ro.failMetrics.IsEmpty() {
bufLen := ro.failMetrics.Len()
// how many batches of failed writes we need to write. // how many batches of failed writes we need to write.
nBatches := bufLen/ro.MetricBatchSize + 1 nBatches := nFails/ro.MetricBatchSize + 1
batchSize := ro.MetricBatchSize batchSize := ro.MetricBatchSize
for i := 0; i < nBatches; i++ { for i := 0; i < nBatches; i++ {
// If it's the last batch, only grab the metrics that have not had // If it's the last batch, only grab the metrics that have not had
// a write attempt already (this is primarily to preserve order). // a write attempt already (this is primarily to preserve order).
if i == nBatches-1 { if i == nBatches-1 {
batchSize = bufLen % ro.MetricBatchSize batchSize = nFails % ro.MetricBatchSize
} }
batch := ro.failMetrics.Batch(batchSize) batch := ro.failMetrics.Batch(batchSize)
// If we've already failed previous writes, don't bother trying to // If we've already failed previous writes, don't bother trying to
@ -127,6 +153,7 @@ func (ro *RunningOutput) Write() error {
if err == nil { if err == nil {
err = ro.write(batch) err = ro.write(batch)
} }
if err != nil { if err != nil {
ro.failMetrics.Add(batch...) ro.failMetrics.Add(batch...)
return err return err
@ -135,17 +162,19 @@ func (ro *RunningOutput) Write() error {
} }
func (ro *RunningOutput) write(metrics []telegraf.Metric) error { func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
if metrics == nil || len(metrics) == 0 { nMetrics := len(metrics)
if nMetrics == 0 {
return nil return nil
} }
start := time.Now() start := time.Now()
err := ro.Output.Write(metrics) err := ro.Output.Write(metrics)
elapsed := time.Since(start) elapsed := time.Since(start)
if err == nil { if err == nil {
if !ro.Quiet { log.Printf("D! Output [%s] wrote batch of %d metrics in %s\n",
log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", ro.Name, nMetrics, elapsed)
ro.Name, len(metrics), elapsed) ro.MetricsWritten.Incr(int64(nMetrics))
} ro.BufferSize.Incr(-int64(nMetrics))
ro.WriteTime.Incr(elapsed.Nanoseconds())
} }
return err return err
} }

View File

@ -36,7 +36,6 @@ func BenchmarkRunningOutputAddWrite(b *testing.B) {
m := &perfOutput{} m := &perfOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000) ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.Quiet = true
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
ro.AddMetric(testutil.TestMetric(101, "metric1")) ro.AddMetric(testutil.TestMetric(101, "metric1"))
@ -52,7 +51,6 @@ func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) {
m := &perfOutput{} m := &perfOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000) ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.Quiet = true
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
ro.AddMetric(testutil.TestMetric(101, "metric1")) ro.AddMetric(testutil.TestMetric(101, "metric1"))
@ -71,7 +69,6 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
m := &perfOutput{} m := &perfOutput{}
m.failWrite = true m.failWrite = true
ro := NewRunningOutput("test", m, conf, 1000, 10000) ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.Quiet = true
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
ro.AddMetric(testutil.TestMetric(101, "metric1")) ro.AddMetric(testutil.TestMetric(101, "metric1"))

View File

@ -27,6 +27,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/http_response"
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson"
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/inputs/internal"
_ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor" _ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor"
_ "github.com/influxdata/telegraf/plugins/inputs/iptables" _ "github.com/influxdata/telegraf/plugins/inputs/iptables"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia"

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/selfstat"
) )
const ( const (
@ -43,6 +44,18 @@ type HTTPListener struct {
parser influx.InfluxParser parser influx.InfluxParser
acc telegraf.Accumulator acc telegraf.Accumulator
pool *pool pool *pool
BytesRecv selfstat.Stat
RequestsServed selfstat.Stat
WritesServed selfstat.Stat
QueriesServed selfstat.Stat
PingsServed selfstat.Stat
RequestsRecv selfstat.Stat
WritesRecv selfstat.Stat
QueriesRecv selfstat.Stat
PingsRecv selfstat.Stat
NotFoundsServed selfstat.Stat
BuffersCreated selfstat.Stat
} }
const sampleConfig = ` const sampleConfig = `
@ -72,7 +85,7 @@ func (h *HTTPListener) Description() string {
} }
func (h *HTTPListener) Gather(_ telegraf.Accumulator) error { func (h *HTTPListener) Gather(_ telegraf.Accumulator) error {
log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated()) h.BuffersCreated.Set(h.pool.ncreated())
return nil return nil
} }
@ -81,6 +94,21 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock() defer h.mu.Unlock()
tags := map[string]string{
"address": h.ServiceAddress,
}
h.BytesRecv = selfstat.Register("http_listener", "bytes_received", tags)
h.RequestsServed = selfstat.Register("http_listener", "requests_served", tags)
h.WritesServed = selfstat.Register("http_listener", "writes_served", tags)
h.QueriesServed = selfstat.Register("http_listener", "queries_served", tags)
h.PingsServed = selfstat.Register("http_listener", "pings_served", tags)
h.RequestsRecv = selfstat.Register("http_listener", "requests_received", tags)
h.WritesRecv = selfstat.Register("http_listener", "writes_received", tags)
h.QueriesRecv = selfstat.Register("http_listener", "queries_received", tags)
h.PingsRecv = selfstat.Register("http_listener", "pings_received", tags)
h.NotFoundsServed = selfstat.Register("http_listener", "not_founds_served", tags)
h.BuffersCreated = selfstat.Register("http_listener", "buffers_created", tags)
if h.MaxBodySize == 0 { if h.MaxBodySize == 0 {
h.MaxBodySize = DEFAULT_MAX_BODY_SIZE h.MaxBodySize = DEFAULT_MAX_BODY_SIZE
} }
@ -141,10 +169,16 @@ func (h *HTTPListener) httpListen() error {
} }
func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.RequestsRecv.Incr(1)
defer h.RequestsServed.Incr(1)
switch req.URL.Path { switch req.URL.Path {
case "/write": case "/write":
h.WritesRecv.Incr(1)
defer h.WritesServed.Incr(1)
h.serveWrite(res, req) h.serveWrite(res, req)
case "/query": case "/query":
h.QueriesRecv.Incr(1)
defer h.QueriesServed.Incr(1)
// Deliver a dummy response to the query endpoint, as some InfluxDB // Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query // clients test endpoint availability with a query
res.Header().Set("Content-Type", "application/json") res.Header().Set("Content-Type", "application/json")
@ -152,9 +186,12 @@ func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(http.StatusOK) res.WriteHeader(http.StatusOK)
res.Write([]byte("{\"results\":[]}")) res.Write([]byte("{\"results\":[]}"))
case "/ping": case "/ping":
h.PingsRecv.Incr(1)
defer h.PingsServed.Incr(1)
// respond to ping requests // respond to ping requests
res.WriteHeader(http.StatusNoContent) res.WriteHeader(http.StatusNoContent)
default: default:
defer h.NotFoundsServed.Incr(1)
// Don't know how to respond to calls to other endpoints // Don't know how to respond to calls to other endpoints
http.NotFound(res, req) http.NotFound(res, req)
} }
@ -195,6 +232,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
badRequest(res) badRequest(res)
return return
} }
h.BytesRecv.Incr(int64(n))
if err == io.EOF { if err == io.EOF {
if return400 { if return400 {

View File

@ -0,0 +1,83 @@
# Internal Input Plugin
The `internal` plugin collects metrics about the telegraf agent itself.
Note that some metrics are aggregates across all instances of one type of
plugin.
### Configuration:
```toml
# Collect statistics about itself
[[inputs.internal]]
## If true, collect telegraf memory stats.
# collect_memstats = true
```
### Measurements & Fields:
memstats are taken from the Go runtime: https://golang.org/pkg/runtime/#MemStats
- internal\_memstats
- alloc\_bytes
- frees
- heap\_alloc\_bytes
- heap\_idle\_bytes
- heap\_in\_use\_bytes
- heap\_objects\_bytes
- heap\_released\_bytes
- heap\_sys\_bytes
- mallocs
- num\_gc
- pointer\_lookups
- sys\_bytes
- total\_alloc\_bytes
agent stats collect aggregate stats on all telegraf plugins.
- internal\_agent
- gather\_errors
- metrics\_dropped
- metrics\_gathered
- metrics\_written
internal\_gather stats collect aggregate stats on all input plugins
that are of the same input type. They are tagged with `input=<plugin_name>`.
- internal\_gather
- gather\_time\_ns
- metrics\_gathered
internal\_write stats collect aggregate stats on all output plugins
that are of the same input type. They are tagged with `output=<plugin_name>`.
- internal\_write
- buffer\_limit
- buffer\_size
- metrics\_written
- metrics\_filtered
- write\_time\_ns
internal\_\<plugin\_name\> are metrics which are defined on a per-plugin basis, and
usually contain tags which differentiate each instance of a particular type of
plugin.
- internal\_\<plugin\_name\>
- individual plugin-specific fields, such as requests counts.
### Tags:
All measurements for specific plugins are tagged with information relevant
to each particular plugin.
### Example Output:
```
internal_memstats,host=tyrion alloc_bytes=4457408i,sys_bytes=10590456i,pointer_lookups=7i,mallocs=17642i,frees=7473i,heap_sys_bytes=6848512i,heap_idle_bytes=1368064i,heap_in_use_bytes=5480448i,heap_released_bytes=0i,total_alloc_bytes=6875560i,heap_alloc_bytes=4457408i,heap_objects_bytes=10169i,num_gc=2i 1480682800000000000
internal_agent,host=tyrion metrics_written=18i,metrics_dropped=0i,metrics_gathered=19i,gather_errors=0i 1480682800000000000
internal_write,output=file,host=tyrion buffer_limit=10000i,write_time_ns=636609i,metrics_written=18i,buffer_size=0i 1480682800000000000
internal_gather,input=internal,host=tyrion metrics_gathered=19i,gather_time_ns=442114i 1480682800000000000
internal_gather,input=http_listener,host=tyrion metrics_gathered=0i,gather_time_ns=167285i 1480682800000000000
internal_http_listener,address=:8186,host=tyrion queries_received=0i,writes_received=0i,requests_received=0i,buffers_created=0i,requests_served=0i,pings_received=0i,bytes_received=0i,not_founds_served=0i,pings_served=0i,queries_served=0i,writes_served=0i 1480682800000000000
```

View File

@ -0,0 +1,66 @@
package internal
import (
"runtime"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/selfstat"
)
type Self struct {
CollectMemstats bool
}
func NewSelf() telegraf.Input {
return &Self{
CollectMemstats: true,
}
}
var sampleConfig = `
## If true, collect telegraf memory stats.
# collect_memstats = true
`
func (s *Self) Description() string {
return "Collect statistics about itself"
}
func (s *Self) SampleConfig() string {
return sampleConfig
}
func (s *Self) Gather(acc telegraf.Accumulator) error {
if s.CollectMemstats {
m := &runtime.MemStats{}
runtime.ReadMemStats(m)
fields := map[string]interface{}{
"alloc_bytes": m.Alloc, // bytes allocated and not yet freed
"total_alloc_bytes": m.TotalAlloc, // bytes allocated (even if freed)
"sys_bytes": m.Sys, // bytes obtained from system (sum of XxxSys below)
"pointer_lookups": m.Lookups, // number of pointer lookups
"mallocs": m.Mallocs, // number of mallocs
"frees": m.Frees, // number of frees
// Main allocation heap statistics.
"heap_alloc_bytes": m.HeapAlloc, // bytes allocated and not yet freed (same as Alloc above)
"heap_sys_bytes": m.HeapSys, // bytes obtained from system
"heap_idle_bytes": m.HeapIdle, // bytes in idle spans
"heap_in_use_bytes": m.HeapInuse, // bytes in non-idle span
"heap_released_bytes": m.HeapReleased, // bytes released to the OS
"heap_objects_bytes": m.HeapObjects, // total number of allocated objects
"num_gc": m.NumGC,
}
acc.AddFields("internal_memstats", fields, map[string]string{})
}
for _, m := range selfstat.Metrics() {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
func init() {
inputs.Add("internal", NewSelf)
}

View File

@ -0,0 +1,62 @@
package internal
import (
"testing"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestSelfPlugin(t *testing.T) {
s := NewSelf()
acc := &testutil.Accumulator{}
s.Gather(acc)
assert.True(t, acc.HasMeasurement("internal_memstats"))
// test that a registered stat is incremented
stat := selfstat.Register("mytest", "test", map[string]string{"test": "foo"})
stat.Incr(1)
stat.Incr(2)
s.Gather(acc)
acc.AssertContainsTaggedFields(t, "internal_mytest",
map[string]interface{}{
"test": int64(3),
},
map[string]string{
"test": "foo",
},
)
acc.ClearMetrics()
// test that a registered stat is set properly
stat.Set(101)
s.Gather(acc)
acc.AssertContainsTaggedFields(t, "internal_mytest",
map[string]interface{}{
"test": int64(101),
},
map[string]string{
"test": "foo",
},
)
acc.ClearMetrics()
// test that regular and timing stats can share the same measurement, and
// that timings are set properly.
timing := selfstat.RegisterTiming("mytest", "test_ns", map[string]string{"test": "foo"})
timing.Incr(100)
timing.Incr(200)
s.Gather(acc)
acc.AssertContainsTaggedFields(t, "internal_mytest",
map[string]interface{}{
"test": int64(101),
"test_ns": int64(150),
},
map[string]string{
"test": "foo",
},
)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/selfstat"
) )
type TcpListener struct { type TcpListener struct {
@ -41,6 +42,12 @@ type TcpListener struct {
parser parsers.Parser parser parsers.Parser
acc telegraf.Accumulator acc telegraf.Accumulator
MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat
PacketsRecv selfstat.Stat
BytesRecv selfstat.Stat
} }
var dropwarn = "E! Error: tcp_listener message queue full. " + var dropwarn = "E! Error: tcp_listener message queue full. " +
@ -91,6 +98,16 @@ func (t *TcpListener) Start(acc telegraf.Accumulator) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
tags := map[string]string{
"address": t.ServiceAddress,
}
t.MaxConnections = selfstat.Register("tcp_listener", "max_connections", tags)
t.MaxConnections.Set(int64(t.MaxTCPConnections))
t.CurrentConnections = selfstat.Register("tcp_listener", "current_connections", tags)
t.TotalConnections = selfstat.Register("tcp_listener", "total_connections", tags)
t.PacketsRecv = selfstat.Register("tcp_listener", "packets_received", tags)
t.BytesRecv = selfstat.Register("tcp_listener", "bytes_received", tags)
t.acc = acc t.acc = acc
t.in = make(chan []byte, t.AllowedPendingMessages) t.in = make(chan []byte, t.AllowedPendingMessages)
t.done = make(chan struct{}) t.done = make(chan struct{})
@ -189,6 +206,8 @@ func (t *TcpListener) refuser(conn *net.TCPConn) {
// handler handles a single TCP Connection // handler handles a single TCP Connection
func (t *TcpListener) handler(conn *net.TCPConn, id string) { func (t *TcpListener) handler(conn *net.TCPConn, id string) {
t.CurrentConnections.Incr(1)
t.TotalConnections.Incr(1)
// connection cleanup function // connection cleanup function
defer func() { defer func() {
t.wg.Done() t.wg.Done()
@ -196,6 +215,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
// Add one connection potential back to channel when this one closes // Add one connection potential back to channel when this one closes
t.accept <- true t.accept <- true
t.forget(id) t.forget(id)
t.CurrentConnections.Incr(-1)
}() }()
var n int var n int
@ -212,6 +232,8 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
if n == 0 { if n == 0 {
continue continue
} }
t.BytesRecv.Incr(int64(n))
t.PacketsRecv.Incr(1)
bufCopy := make([]byte, n+1) bufCopy := make([]byte, n+1)
copy(bufCopy, scanner.Bytes()) copy(bufCopy, scanner.Bytes())
bufCopy[n] = '\n' bufCopy[n] = '\n'

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/selfstat"
) )
// UdpListener main struct for the collector // UdpListener main struct for the collector
@ -48,6 +49,9 @@ type UdpListener struct {
acc telegraf.Accumulator acc telegraf.Accumulator
listener *net.UDPConn listener *net.UDPConn
PacketsRecv selfstat.Stat
BytesRecv selfstat.Stat
} }
// UDP_MAX_PACKET_SIZE is packet limit, see // UDP_MAX_PACKET_SIZE is packet limit, see
@ -102,6 +106,12 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
u.Lock() u.Lock()
defer u.Unlock() defer u.Unlock()
tags := map[string]string{
"address": u.ServiceAddress,
}
u.PacketsRecv = selfstat.Register("udp_listener", "packets_received", tags)
u.BytesRecv = selfstat.Register("udp_listener", "bytes_received", tags)
u.acc = acc u.acc = acc
u.in = make(chan []byte, u.AllowedPendingMessages) u.in = make(chan []byte, u.AllowedPendingMessages)
u.done = make(chan struct{}) u.done = make(chan struct{})
@ -162,6 +172,8 @@ func (u *UdpListener) udpListen() error {
} }
continue continue
} }
u.BytesRecv.Incr(int64(n))
u.PacketsRecv.Incr(1)
bufCopy := make([]byte, n) bufCopy := make([]byte, n)
copy(bufCopy, buf[:n]) copy(bufCopy, buf[:n])

169
selfstat/selfstat.go Normal file
View File

@ -0,0 +1,169 @@
// selfstat is a package for tracking and collecting internal statistics
// about telegraf. Metrics can be registered using this package, and then
// incremented or set within your code. If the inputs.internal plugin is enabled,
// then all registered stats will be collected as they would by any other input
// plugin.
package selfstat
import (
"hash/fnv"
"log"
"sort"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
var (
registry *rgstry
)
// Stat is an interface for dealing with telegraf statistics collected
// on itself.
type Stat interface {
// Name is the name of the measurement
Name() string
// FieldName is the name of the measurement field
FieldName() string
// Tags is a tag map. Each time this is called a new map is allocated.
Tags() map[string]string
// Key is the unique measurement+tags key of the stat.
Key() uint64
// Incr increments a regular stat by 'v'.
// in the case of a timing stat, increment adds the timing to the cache.
Incr(v int64)
// Set sets a regular stat to 'v'.
// in the case of a timing stat, set adds the timing to the cache.
Set(v int64)
// Get gets the value of the stat. In the case of timings, this returns
// an average value of all timings received since the last call to Get().
// If no timings were received, it returns the previous value.
Get() int64
}
// Register registers the given measurement, field, and tags in the selfstat
// registry. If given an identical measurement, it will return the stat that's
// already been registered.
//
// The returned Stat can be incremented by the consumer of Register(), and it's
// value will be returned as a telegraf metric when Metrics() is called.
func Register(measurement, field string, tags map[string]string) Stat {
return registry.register(&stat{
measurement: "internal_" + measurement,
field: field,
tags: tags,
})
}
// RegisterTiming registers the given measurement, field, and tags in the selfstat
// registry. If given an identical measurement, it will return the stat that's
// already been registered.
//
// Timing stats differ from regular stats in that they accumulate multiple
// "timings" added to them, and will return the average when Get() is called.
// After Get() is called, the average is cleared and the next timing returned
// from Get() will only reflect timings added since the previous call to Get().
// If Get() is called without receiving any new timings, then the previous value
// is used.
//
// In other words, timings are an averaged metric that get cleared on each call
// to Get().
//
// The returned Stat can be incremented by the consumer of Register(), and it's
// value will be returned as a telegraf metric when Metrics() is called.
func RegisterTiming(measurement, field string, tags map[string]string) Stat {
return registry.register(&timingStat{
measurement: "internal_" + measurement,
field: field,
tags: tags,
})
}
// Metrics returns all registered stats as telegraf metrics.
func Metrics() []telegraf.Metric {
registry.mu.Lock()
now := time.Now()
metrics := make([]telegraf.Metric, len(registry.stats))
i := 0
for _, stats := range registry.stats {
if len(stats) > 0 {
var tags map[string]string
var name string
fields := map[string]interface{}{}
j := 0
for fieldname, stat := range stats {
if j == 0 {
tags = stat.Tags()
name = stat.Name()
}
fields[fieldname] = stat.Get()
j++
}
metric, err := metric.New(name, tags, fields, now)
if err != nil {
log.Printf("E! Error creating selfstat metric: %s", err)
continue
}
metrics[i] = metric
i++
}
}
registry.mu.Unlock()
return metrics
}
type rgstry struct {
stats map[uint64]map[string]Stat
mu sync.Mutex
}
func (r *rgstry) register(s Stat) Stat {
r.mu.Lock()
defer r.mu.Unlock()
if stats, ok := r.stats[s.Key()]; ok {
// measurement exists
if stat, ok := stats[s.FieldName()]; ok {
// field already exists, so don't create a new one
return stat
}
r.stats[s.Key()][s.FieldName()] = s
return s
} else {
// creating a new unique metric
r.stats[s.Key()] = map[string]Stat{s.FieldName(): s}
return s
}
}
func key(measurement string, tags map[string]string) uint64 {
h := fnv.New64a()
h.Write([]byte(measurement))
tmp := make([]string, len(tags))
i := 0
for k, v := range tags {
tmp[i] = k + v
i++
}
sort.Strings(tmp)
for _, s := range tmp {
h.Write([]byte(s))
}
return h.Sum64()
}
func init() {
registry = &rgstry{
stats: make(map[uint64]map[string]Stat),
}
}

221
selfstat/selfstat_test.go Normal file
View File

@ -0,0 +1,221 @@
package selfstat
import (
"sync"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
var (
// only allow one test at a time
// this is because we are dealing with a global registry
testLock sync.Mutex
a int64
)
// testCleanup resets the global registry for test cleanup & unlocks the test lock
func testCleanup() {
registry = &rgstry{
stats: make(map[uint64]map[string]Stat),
}
testLock.Unlock()
}
func BenchmarkStats(b *testing.B) {
testLock.Lock()
defer testCleanup()
b1 := Register("benchmark1", "test_field1", map[string]string{"test": "foo"})
for n := 0; n < b.N; n++ {
b1.Incr(1)
b1.Incr(3)
a = b1.Get()
}
}
func BenchmarkTimingStats(b *testing.B) {
testLock.Lock()
defer testCleanup()
b2 := RegisterTiming("benchmark2", "test_field1", map[string]string{"test": "foo"})
for n := 0; n < b.N; n++ {
b2.Incr(1)
b2.Incr(3)
a = b2.Get()
}
}
func TestRegisterAndIncrAndSet(t *testing.T) {
testLock.Lock()
defer testCleanup()
s1 := Register("test", "test_field1", map[string]string{"test": "foo"})
s2 := Register("test", "test_field2", map[string]string{"test": "foo"})
assert.Equal(t, int64(0), s1.Get())
s1.Incr(10)
s1.Incr(5)
assert.Equal(t, int64(15), s1.Get())
s1.Set(12)
assert.Equal(t, int64(12), s1.Get())
s1.Incr(-2)
assert.Equal(t, int64(10), s1.Get())
s2.Set(101)
assert.Equal(t, int64(101), s2.Get())
// make sure that the same field returns the same metric
// this one should be the same as s2.
foo := Register("test", "test_field2", map[string]string{"test": "foo"})
assert.Equal(t, int64(101), foo.Get())
// check that tags are consistent
assert.Equal(t, map[string]string{"test": "foo"}, foo.Tags())
assert.Equal(t, "internal_test", foo.Name())
}
func TestRegisterTimingAndIncrAndSet(t *testing.T) {
testLock.Lock()
defer testCleanup()
s1 := RegisterTiming("test", "test_field1_ns", map[string]string{"test": "foo"})
s2 := RegisterTiming("test", "test_field2_ns", map[string]string{"test": "foo"})
assert.Equal(t, int64(0), s1.Get())
s1.Incr(10)
s1.Incr(5)
assert.Equal(t, int64(7), s1.Get())
// previous value is used on subsequent calls to Get()
assert.Equal(t, int64(7), s1.Get())
s1.Set(12)
assert.Equal(t, int64(12), s1.Get())
s1.Incr(-2)
assert.Equal(t, int64(-2), s1.Get())
s2.Set(101)
assert.Equal(t, int64(101), s2.Get())
// make sure that the same field returns the same metric
// this one should be the same as s2.
foo := RegisterTiming("test", "test_field2_ns", map[string]string{"test": "foo"})
assert.Equal(t, int64(101), foo.Get())
// check that tags are consistent
assert.Equal(t, map[string]string{"test": "foo"}, foo.Tags())
assert.Equal(t, "internal_test", foo.Name())
}
func TestStatKeyConsistency(t *testing.T) {
s := &stat{
measurement: "internal_stat",
field: "myfield",
tags: map[string]string{
"foo": "bar",
"bar": "baz",
"whose": "first",
},
}
k := s.Key()
for i := 0; i < 5000; i++ {
// assert that the Key() func doesn't change anything.
assert.Equal(t, k, s.Key())
// assert that two identical measurements always produce the same key.
tmp := &stat{
measurement: "internal_stat",
field: "myfield",
tags: map[string]string{
"foo": "bar",
"bar": "baz",
"whose": "first",
},
}
assert.Equal(t, k, tmp.Key())
}
}
func TestRegisterMetricsAndVerify(t *testing.T) {
testLock.Lock()
defer testCleanup()
// register two metrics with the same key
s1 := RegisterTiming("test_timing", "test_field1_ns", map[string]string{"test": "foo"})
s2 := RegisterTiming("test_timing", "test_field2_ns", map[string]string{"test": "foo"})
s1.Incr(10)
s2.Incr(15)
assert.Len(t, Metrics(), 1)
// register two more metrics with different keys
s3 := RegisterTiming("test_timing", "test_field1_ns", map[string]string{"test": "bar"})
s4 := RegisterTiming("test_timing", "test_field2_ns", map[string]string{"test": "baz"})
s3.Incr(10)
s4.Incr(15)
assert.Len(t, Metrics(), 3)
// register some non-timing metrics
s5 := Register("test", "test_field1", map[string]string{"test": "bar"})
s6 := Register("test", "test_field2", map[string]string{"test": "baz"})
Register("test", "test_field3", map[string]string{"test": "baz"})
s5.Incr(10)
s5.Incr(18)
s6.Incr(15)
assert.Len(t, Metrics(), 5)
acc := testutil.Accumulator{}
acc.AddMetrics(Metrics())
// verify s1 & s2
acc.AssertContainsTaggedFields(t, "internal_test_timing",
map[string]interface{}{
"test_field1_ns": int64(10),
"test_field2_ns": int64(15),
},
map[string]string{
"test": "foo",
},
)
// verify s3
acc.AssertContainsTaggedFields(t, "internal_test_timing",
map[string]interface{}{
"test_field1_ns": int64(10),
},
map[string]string{
"test": "bar",
},
)
// verify s4
acc.AssertContainsTaggedFields(t, "internal_test_timing",
map[string]interface{}{
"test_field2_ns": int64(15),
},
map[string]string{
"test": "baz",
},
)
// verify s5
acc.AssertContainsTaggedFields(t, "internal_test",
map[string]interface{}{
"test_field1": int64(28),
},
map[string]string{
"test": "bar",
},
)
// verify s6 & s7
acc.AssertContainsTaggedFields(t, "internal_test",
map[string]interface{}{
"test_field2": int64(15),
"test_field3": int64(0),
},
map[string]string{
"test": "baz",
},
)
}

50
selfstat/stat.go Normal file
View File

@ -0,0 +1,50 @@
package selfstat
import (
"sync/atomic"
)
type stat struct {
v int64
measurement string
field string
tags map[string]string
key uint64
}
func (s *stat) Incr(v int64) {
atomic.AddInt64(&s.v, v)
}
func (s *stat) Set(v int64) {
atomic.StoreInt64(&s.v, v)
}
func (s *stat) Get() int64 {
return atomic.LoadInt64(&s.v)
}
func (s *stat) Name() string {
return s.measurement
}
func (s *stat) FieldName() string {
return s.field
}
// Tags returns a copy of the stat's tags.
// NOTE this allocates a new map every time it is called.
func (s *stat) Tags() map[string]string {
m := make(map[string]string, len(s.tags))
for k, v := range s.tags {
m[k] = v
}
return m
}
func (s *stat) Key() uint64 {
if s.key == 0 {
s.key = key(s.measurement, s.tags)
}
return s.key
}

66
selfstat/timingStat.go Normal file
View File

@ -0,0 +1,66 @@
package selfstat
import (
"sync"
)
type timingStat struct {
measurement string
field string
tags map[string]string
key uint64
v int64
prev int64
count int64
mu sync.Mutex
}
func (s *timingStat) Incr(v int64) {
s.mu.Lock()
s.v += v
s.count++
s.mu.Unlock()
}
func (s *timingStat) Set(v int64) {
s.Incr(v)
}
func (s *timingStat) Get() int64 {
var avg int64
s.mu.Lock()
if s.count > 0 {
s.prev, avg = s.v/s.count, s.v/s.count
s.v = 0
s.count = 0
} else {
avg = s.prev
}
s.mu.Unlock()
return avg
}
func (s *timingStat) Name() string {
return s.measurement
}
func (s *timingStat) FieldName() string {
return s.field
}
// Tags returns a copy of the timingStat's tags.
// NOTE this allocates a new map every time it is called.
func (s *timingStat) Tags() map[string]string {
m := make(map[string]string, len(s.tags))
for k, v := range s.tags {
m[k] = v
}
return m
}
func (s *timingStat) Key() uint64 {
if s.key == 0 {
s.key = key(s.measurement, s.tags)
}
return s.key
}

View File

@ -9,6 +9,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -110,6 +112,12 @@ func (a *Accumulator) AddGauge(
a.AddFields(measurement, fields, tags, timestamp...) a.AddFields(measurement, fields, tags, timestamp...)
} }
func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) {
for _, m := range metrics {
a.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
// AddError appends the given error to Accumulator.Errors. // AddError appends the given error to Accumulator.Errors.
func (a *Accumulator) AddError(err error) { func (a *Accumulator) AddError(err error) {
if err == nil { if err == nil {