diff --git a/CHANGELOG.md b/CHANGELOG.md index 5958ad099..6e580cf1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#2074](https://github.com/influxdata/telegraf/pull/2074): "discard" output plugin added, primarily for testing purposes. - [#1965](https://github.com/influxdata/telegraf/pull/1965): The JSON parser can now parse an array of objects using the same configuration. - [#1807](https://github.com/influxdata/telegraf/pull/1807): Option to use device name rather than path for reporting disk stats. +- [#1348](https://github.com/influxdata/telegraf/issues/1348): Telegraf "internal" plugin for collecting stats on itself. ### Bugfixes diff --git a/README.md b/README.md index 6577eafc8..dbcf9770d 100644 --- a/README.md +++ b/README.md @@ -159,6 +159,7 @@ configuration options. * [hddtemp](./plugins/inputs/hddtemp) * [http_response](./plugins/inputs/http_response) * [httpjson](./plugins/inputs/httpjson) (generic JSON-emitting http service plugin) +* [internal](./plugins/inputs/internal) * [influxdb](./plugins/inputs/influxdb) * [ipmi_sensor](./plugins/inputs/ipmi_sensor) * [iptables](./plugins/inputs/iptables) diff --git a/agent/accumulator.go b/agent/accumulator.go index 0d682d285..1f9e2270d 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -2,10 +2,14 @@ package agent import ( "log" - "sync/atomic" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + NErrors = selfstat.Register("agent", "gather_errors", map[string]string{}) ) type MetricMaker interface { @@ -37,8 +41,6 @@ type accumulator struct { maker MetricMaker precision time.Duration - - errCount uint64 } func (ac *accumulator) AddFields( @@ -80,7 +82,7 @@ func (ac *accumulator) AddError(err error) { if err == nil { return } - atomic.AddUint64(&ac.errCount, 1) + NErrors.Incr(1) //TODO suppress/throttle consecutive duplicate errors? log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err) } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 6ff8b9224..e13446bac 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -88,7 +88,7 @@ func TestAccAddError(t *testing.T) { a.AddError(fmt.Errorf("baz")) errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) - assert.EqualValues(t, 3, a.errCount) + assert.EqualValues(t, int64(3), NErrors.Get()) require.Len(t, errs, 4) // 4 because of trailing newline assert.Contains(t, string(errs[0]), "TestPlugin") assert.Contains(t, string(errs[0]), "foo") diff --git a/agent/agent.go b/agent/agent.go index 9bfa66f02..ab64154e0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/selfstat" ) // Agent runs telegraf and collects data based on the given config @@ -44,8 +45,6 @@ func NewAgent(config *config.Config) (*Agent, error) { // Connect connects to all configured outputs func (a *Agent) Connect() error { for _, o := range a.Config.Outputs { - o.Quiet = a.Config.Agent.Quiet - switch ot := o.Output.(type) { case telegraf.ServiceOutput: if err := ot.Start(); err != nil { @@ -106,24 +105,26 @@ func (a *Agent) gatherer( ) { defer panicRecover(input) + GatherTime := selfstat.RegisterTiming("gather", + "gather_time_ns", + map[string]string{"input": input.Config.Name}, + ) + + acc := NewAccumulator(input, metricC) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) + ticker := time.NewTicker(interval) defer ticker.Stop() for { - acc := NewAccumulator(input, metricC) - acc.SetPrecision(a.Config.Agent.Precision.Duration, - a.Config.Agent.Interval.Duration) - input.SetDebug(a.Config.Agent.Debug) - input.SetDefaultTags(a.Config.Tags) - internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) start := time.Now() gatherWithTimeout(shutdown, input, acc, interval) elapsed := time.Since(start) - log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n", - input.Name(), interval, elapsed) + GatherTime.Incr(elapsed.Nanoseconds()) select { case <-shutdown: @@ -204,9 +205,6 @@ func (a *Agent) Test() error { if err := input.Input.Gather(acc); err != nil { return err } - if acc.errCount > 0 { - return fmt.Errorf("Errors encountered during processing") - } // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. @@ -327,13 +325,13 @@ func (a *Agent) Run(shutdown chan struct{}) error { // Start all ServicePlugins for _, input := range a.Config.Inputs { + input.SetDefaultTags(a.Config.Tags) switch p := input.Input.(type) { case telegraf.ServiceInput: acc := NewAccumulator(input, metricC) // Service input plugins should set their own precision of their // metrics. acc.SetPrecision(time.Nanosecond, 0) - input.SetDefaultTags(a.Config.Tags) if err := p.Start(acc); err != nil { log.Printf("E! Service for input %s failed to start, exiting\n%s\n", input.Name(), err.Error()) diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index 58cd1c376..5e7818ef1 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -4,15 +4,17 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + MetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{}) + MetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{}) ) // Buffer is an object for storing metrics in a circular buffer. type Buffer struct { buf chan telegraf.Metric - // total dropped metrics - drops int - // total metrics added - total int mu sync.Mutex } @@ -36,25 +38,14 @@ func (b *Buffer) Len() int { return len(b.buf) } -// Drops returns the total number of dropped metrics that have occured in this -// buffer since instantiation. -func (b *Buffer) Drops() int { - return b.drops -} - -// Total returns the total number of metrics that have been added to this buffer. -func (b *Buffer) Total() int { - return b.total -} - // Add adds metrics to the buffer. func (b *Buffer) Add(metrics ...telegraf.Metric) { for i, _ := range metrics { - b.total++ + MetricsWritten.Incr(1) select { case b.buf <- metrics[i]: default: - b.drops++ + MetricsDropped.Incr(1) <-b.buf b.buf <- metrics[i] } diff --git a/internal/buffer/buffer_test.go b/internal/buffer/buffer_test.go index 9a36f4d84..f84d8c66d 100644 --- a/internal/buffer/buffer_test.go +++ b/internal/buffer/buffer_test.go @@ -27,47 +27,53 @@ func BenchmarkAddMetrics(b *testing.B) { func TestNewBufferBasicFuncs(t *testing.T) { b := NewBuffer(10) + MetricsDropped.Set(0) + MetricsWritten.Set(0) assert.True(t, b.IsEmpty()) assert.Zero(t, b.Len()) - assert.Zero(t, b.Drops()) - assert.Zero(t, b.Total()) + assert.Zero(t, MetricsDropped.Get()) + assert.Zero(t, MetricsWritten.Get()) m := testutil.TestMetric(1, "mymetric") b.Add(m) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 1) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 1) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(1), MetricsWritten.Get()) b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 6) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 6) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(6), MetricsWritten.Get()) } func TestDroppingMetrics(t *testing.T) { b := NewBuffer(10) + MetricsDropped.Set(0) + MetricsWritten.Set(0) // Add up to the size of the buffer b.Add(metricList...) b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 10) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 10) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(10), MetricsWritten.Get()) // Add 5 more and verify they were dropped b.Add(metricList...) assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 10) - assert.Equal(t, b.Drops(), 5) - assert.Equal(t, b.Total(), 15) + assert.Equal(t, int64(5), MetricsDropped.Get()) + assert.Equal(t, int64(15), MetricsWritten.Get()) } func TestGettingBatches(t *testing.T) { b := NewBuffer(20) + MetricsDropped.Set(0) + MetricsWritten.Set(0) // Verify that the buffer returned is smaller than requested when there are // not as many items as requested. @@ -78,8 +84,8 @@ func TestGettingBatches(t *testing.T) { // Verify that the buffer is now empty assert.True(t, b.IsEmpty()) assert.Zero(t, b.Len()) - assert.Zero(t, b.Drops()) - assert.Equal(t, b.Total(), 5) + assert.Zero(t, MetricsDropped.Get()) + assert.Equal(t, int64(5), MetricsWritten.Get()) // Verify that the buffer returned is not more than the size requested b.Add(metricList...) @@ -89,6 +95,6 @@ func TestGettingBatches(t *testing.T) { // Verify that buffer is not empty assert.False(t, b.IsEmpty()) assert.Equal(t, b.Len(), 2) - assert.Equal(t, b.Drops(), 0) - assert.Equal(t, b.Total(), 10) + assert.Equal(t, int64(0), MetricsDropped.Get()) + assert.Equal(t, int64(10), MetricsWritten.Get()) } diff --git a/internal/config/config.go b/internal/config/config.go index 2c2199dac..24dec4169 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } - rp := &models.RunningInput{ - Input: input, - Config: pluginConfig, - } + rp := models.NewRunningInput(input, pluginConfig) c.Inputs = append(c.Inputs, rp) return nil } diff --git a/internal/models/makemetric.go b/internal/models/makemetric.go index 753dfad15..4e398043a 100644 --- a/internal/models/makemetric.go +++ b/internal/models/makemetric.go @@ -32,7 +32,6 @@ func makemetric( daemonTags map[string]string, filter Filter, applyFilter bool, - debug bool, mType telegraf.ValueType, t time.Time, ) telegraf.Metric { @@ -123,11 +122,9 @@ func makemetric( case float64: // NaNs are invalid values in influxdb, skip measurement if math.IsNaN(val) || math.IsInf(val, 0) { - if debug { - log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ - "field, skipping", - measurement, k) - } + log.Printf("D! Measurement [%s] field [%s] has a NaN or Inf "+ + "field, skipping", + measurement, k) delete(fields, k) continue } diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 1d259005e..2e22f1569 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -66,7 +66,6 @@ func (r *RunningAggregator) MakeMetric( nil, r.Config.Filter, false, - false, mType, t, ) diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 558af3e5c..4279a7f62 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -5,15 +5,34 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" ) +var GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{}) + type RunningInput struct { Input telegraf.Input Config *InputConfig trace bool - debug bool defaultTags map[string]string + + MetricsGathered selfstat.Stat +} + +func NewRunningInput( + input telegraf.Input, + config *InputConfig, +) *RunningInput { + return &RunningInput{ + Input: input, + Config: config, + MetricsGathered: selfstat.Register( + "gather", + "metrics_gathered", + map[string]string{"input": config.Name}, + ), + } } // InputConfig containing a name, interval, and filter @@ -51,7 +70,6 @@ func (r *RunningInput) MakeMetric( r.defaultTags, r.Config.Filter, true, - r.debug, mType, t, ) @@ -60,17 +78,11 @@ func (r *RunningInput) MakeMetric( fmt.Println("> " + m.String()) } + r.MetricsGathered.Incr(1) + GlobalMetricsGathered.Incr(1) return m } -func (r *RunningInput) Debug() bool { - return r.debug -} - -func (r *RunningInput) SetDebug(debug bool) { - r.debug = debug -} - func (r *RunningInput) Trace() bool { return r.trace } diff --git a/internal/models/running_input_test.go b/internal/models/running_input_test.go index d5fd5ae0c..33c785c6e 100644 --- a/internal/models/running_input_test.go +++ b/internal/models/running_input_test.go @@ -13,11 +13,9 @@ import ( func TestMakeMetricNoFields(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) m := ri.MakeMetric( "RITest", @@ -32,11 +30,9 @@ func TestMakeMetricNoFields(t *testing.T) { // nil fields should get dropped func TestMakeMetricNilFields(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) m := ri.MakeMetric( "RITest", @@ -58,13 +54,10 @@ func TestMakeMetricNilFields(t *testing.T) { // make an untyped, counter, & gauge metric func TestMakeMetric(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) assert.Equal(t, "inputs.TestRunningInput", ri.Name()) @@ -126,16 +119,13 @@ func TestMakeMetric(t *testing.T) { func TestMakeMetricWithPluginTags(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - Tags: map[string]string{ - "foo": "bar", - }, + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -155,17 +145,14 @@ func TestMakeMetricWithPluginTags(t *testing.T) { func TestMakeMetricFilteredOut(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - Tags: map[string]string{ - "foo": "bar", - }, - Filter: Filter{NamePass: []string{"foobar"}}, + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + Filter: Filter{NamePass: []string{"foobar"}}, + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) assert.NoError(t, ri.Config.Filter.Compile()) @@ -182,16 +169,13 @@ func TestMakeMetricFilteredOut(t *testing.T) { func TestMakeMetricWithDaemonTags(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) ri.SetDefaultTags(map[string]string{ "foo": "bar", }) - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -214,13 +198,10 @@ func TestMakeMetricInfFields(t *testing.T) { inf := math.Inf(1) ninf := math.Inf(-1) now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -244,13 +225,10 @@ func TestMakeMetricInfFields(t *testing.T) { func TestMakeMetricAllFieldTypes(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - }, - } - ri.SetDebug(true) - assert.Equal(t, true, ri.Debug()) + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + }) + ri.SetTrace(true) assert.Equal(t, true, ri.Trace()) @@ -293,12 +271,10 @@ func TestMakeMetricAllFieldTypes(t *testing.T) { func TestMakeMetricNameOverride(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - NameOverride: "foobar", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + NameOverride: "foobar", + }) m := ri.MakeMetric( "RITest", @@ -316,12 +292,10 @@ func TestMakeMetricNameOverride(t *testing.T) { func TestMakeMetricNamePrefix(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - MeasurementPrefix: "foobar_", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + MeasurementPrefix: "foobar_", + }) m := ri.MakeMetric( "RITest", @@ -339,12 +313,10 @@ func TestMakeMetricNamePrefix(t *testing.T) { func TestMakeMetricNameSuffix(t *testing.T) { now := time.Now() - ri := RunningInput{ - Config: &InputConfig{ - Name: "TestRunningInput", - MeasurementSuffix: "_foobar", - }, - } + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + MeasurementSuffix: "_foobar", + }) m := ri.MakeMetric( "RITest", @@ -359,3 +331,9 @@ func TestMakeMetricNameSuffix(t *testing.T) { m.String(), ) } + +type testInput struct{} + +func (t *testInput) Description() string { return "" } +func (t *testInput) SampleConfig() string { return "" } +func (t *testInput) Gather(acc telegraf.Accumulator) error { return nil } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 9f2f2bf5f..0ae78c983 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -22,10 +23,15 @@ type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig - Quiet bool MetricBufferLimit int MetricBatchSize int + MetricsFiltered selfstat.Stat + MetricsWritten selfstat.Stat + BufferSize selfstat.Stat + BufferLimit selfstat.Stat + WriteTime selfstat.Stat + metrics *buffer.Buffer failMetrics *buffer.Buffer } @@ -51,7 +57,33 @@ func NewRunningOutput( Config: conf, MetricBufferLimit: bufferLimit, MetricBatchSize: batchSize, + MetricsWritten: selfstat.Register( + "write", + "metrics_written", + map[string]string{"output": name}, + ), + MetricsFiltered: selfstat.Register( + "write", + "metrics_filtered", + map[string]string{"output": name}, + ), + BufferSize: selfstat.Register( + "write", + "buffer_size", + map[string]string{"output": name}, + ), + BufferLimit: selfstat.Register( + "write", + "buffer_limit", + map[string]string{"output": name}, + ), + WriteTime: selfstat.RegisterTiming( + "write", + "write_time_ns", + map[string]string{"output": name}, + ), } + ro.BufferLimit.Incr(int64(ro.MetricBufferLimit)) return ro } @@ -67,6 +99,7 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { fields := m.Fields() t := m.Time() if ok := ro.Config.Filter.Apply(name, fields, tags); !ok { + ro.MetricsFiltered.Incr(1) return } // error is not possible if creating from another metric, so ignore. @@ -85,28 +118,21 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { - if !ro.Quiet { - log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+ - "Total gathered metrics: %d. Total dropped metrics: %d.", - ro.Name, - ro.failMetrics.Len()+ro.metrics.Len(), - ro.MetricBufferLimit, - ro.metrics.Total(), - ro.metrics.Drops()+ro.failMetrics.Drops()) - } - + nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len() + log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ", + ro.Name, nFails+nMetrics, ro.MetricBufferLimit) + ro.BufferSize.Incr(int64(nFails + nMetrics)) var err error if !ro.failMetrics.IsEmpty() { - bufLen := ro.failMetrics.Len() // how many batches of failed writes we need to write. - nBatches := bufLen/ro.MetricBatchSize + 1 + nBatches := nFails/ro.MetricBatchSize + 1 batchSize := ro.MetricBatchSize for i := 0; i < nBatches; i++ { // If it's the last batch, only grab the metrics that have not had // a write attempt already (this is primarily to preserve order). if i == nBatches-1 { - batchSize = bufLen % ro.MetricBatchSize + batchSize = nFails % ro.MetricBatchSize } batch := ro.failMetrics.Batch(batchSize) // If we've already failed previous writes, don't bother trying to @@ -127,6 +153,7 @@ func (ro *RunningOutput) Write() error { if err == nil { err = ro.write(batch) } + if err != nil { ro.failMetrics.Add(batch...) return err @@ -135,17 +162,19 @@ func (ro *RunningOutput) Write() error { } func (ro *RunningOutput) write(metrics []telegraf.Metric) error { - if metrics == nil || len(metrics) == 0 { + nMetrics := len(metrics) + if nMetrics == 0 { return nil } start := time.Now() err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { - if !ro.Quiet { - log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", - ro.Name, len(metrics), elapsed) - } + log.Printf("D! Output [%s] wrote batch of %d metrics in %s\n", + ro.Name, nMetrics, elapsed) + ro.MetricsWritten.Incr(int64(nMetrics)) + ro.BufferSize.Incr(-int64(nMetrics)) + ro.WriteTime.Incr(elapsed.Nanoseconds()) } return err } diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index 2555025fa..c55334218 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -36,7 +36,6 @@ func BenchmarkRunningOutputAddWrite(b *testing.B) { m := &perfOutput{} ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(testutil.TestMetric(101, "metric1")) @@ -52,7 +51,6 @@ func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) { m := &perfOutput{} ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(testutil.TestMetric(101, "metric1")) @@ -71,7 +69,6 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) { m := &perfOutput{} m.failWrite = true ro := NewRunningOutput("test", m, conf, 1000, 10000) - ro.Quiet = true for n := 0; n < b.N; n++ { ro.AddMetric(testutil.TestMetric(101, "metric1")) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 67b85905e..7846f8c9a 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -27,6 +27,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb" + _ "github.com/influxdata/telegraf/plugins/inputs/internal" _ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor" _ "github.com/influxdata/telegraf/plugins/inputs/iptables" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 3d3467d71..0f426f809 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -43,6 +44,18 @@ type HTTPListener struct { parser influx.InfluxParser acc telegraf.Accumulator pool *pool + + BytesRecv selfstat.Stat + RequestsServed selfstat.Stat + WritesServed selfstat.Stat + QueriesServed selfstat.Stat + PingsServed selfstat.Stat + RequestsRecv selfstat.Stat + WritesRecv selfstat.Stat + QueriesRecv selfstat.Stat + PingsRecv selfstat.Stat + NotFoundsServed selfstat.Stat + BuffersCreated selfstat.Stat } const sampleConfig = ` @@ -72,7 +85,7 @@ func (h *HTTPListener) Description() string { } func (h *HTTPListener) Gather(_ telegraf.Accumulator) error { - log.Printf("D! The http_listener has created %d buffers", h.pool.ncreated()) + h.BuffersCreated.Set(h.pool.ncreated()) return nil } @@ -81,6 +94,21 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error { h.mu.Lock() defer h.mu.Unlock() + tags := map[string]string{ + "address": h.ServiceAddress, + } + h.BytesRecv = selfstat.Register("http_listener", "bytes_received", tags) + h.RequestsServed = selfstat.Register("http_listener", "requests_served", tags) + h.WritesServed = selfstat.Register("http_listener", "writes_served", tags) + h.QueriesServed = selfstat.Register("http_listener", "queries_served", tags) + h.PingsServed = selfstat.Register("http_listener", "pings_served", tags) + h.RequestsRecv = selfstat.Register("http_listener", "requests_received", tags) + h.WritesRecv = selfstat.Register("http_listener", "writes_received", tags) + h.QueriesRecv = selfstat.Register("http_listener", "queries_received", tags) + h.PingsRecv = selfstat.Register("http_listener", "pings_received", tags) + h.NotFoundsServed = selfstat.Register("http_listener", "not_founds_served", tags) + h.BuffersCreated = selfstat.Register("http_listener", "buffers_created", tags) + if h.MaxBodySize == 0 { h.MaxBodySize = DEFAULT_MAX_BODY_SIZE } @@ -141,10 +169,16 @@ func (h *HTTPListener) httpListen() error { } func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { + h.RequestsRecv.Incr(1) + defer h.RequestsServed.Incr(1) switch req.URL.Path { case "/write": + h.WritesRecv.Incr(1) + defer h.WritesServed.Incr(1) h.serveWrite(res, req) case "/query": + h.QueriesRecv.Incr(1) + defer h.QueriesServed.Incr(1) // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query res.Header().Set("Content-Type", "application/json") @@ -152,9 +186,12 @@ func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { res.WriteHeader(http.StatusOK) res.Write([]byte("{\"results\":[]}")) case "/ping": + h.PingsRecv.Incr(1) + defer h.PingsServed.Incr(1) // respond to ping requests res.WriteHeader(http.StatusNoContent) default: + defer h.NotFoundsServed.Incr(1) // Don't know how to respond to calls to other endpoints http.NotFound(res, req) } @@ -195,6 +232,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { badRequest(res) return } + h.BytesRecv.Incr(int64(n)) if err == io.EOF { if return400 { diff --git a/plugins/inputs/internal/README.md b/plugins/inputs/internal/README.md new file mode 100644 index 000000000..4e6d3795b --- /dev/null +++ b/plugins/inputs/internal/README.md @@ -0,0 +1,83 @@ +# Internal Input Plugin + +The `internal` plugin collects metrics about the telegraf agent itself. + +Note that some metrics are aggregates across all instances of one type of +plugin. + +### Configuration: + +```toml +# Collect statistics about itself +[[inputs.internal]] + ## If true, collect telegraf memory stats. + # collect_memstats = true +``` + +### Measurements & Fields: + +memstats are taken from the Go runtime: https://golang.org/pkg/runtime/#MemStats + +- internal\_memstats + - alloc\_bytes + - frees + - heap\_alloc\_bytes + - heap\_idle\_bytes + - heap\_in\_use\_bytes + - heap\_objects\_bytes + - heap\_released\_bytes + - heap\_sys\_bytes + - mallocs + - num\_gc + - pointer\_lookups + - sys\_bytes + - total\_alloc\_bytes + +agent stats collect aggregate stats on all telegraf plugins. + +- internal\_agent + - gather\_errors + - metrics\_dropped + - metrics\_gathered + - metrics\_written + +internal\_gather stats collect aggregate stats on all input plugins +that are of the same input type. They are tagged with `input=`. + +- internal\_gather + - gather\_time\_ns + - metrics\_gathered + +internal\_write stats collect aggregate stats on all output plugins +that are of the same input type. They are tagged with `output=`. + + +- internal\_write + - buffer\_limit + - buffer\_size + - metrics\_written + - metrics\_filtered + - write\_time\_ns + +internal\_\ are metrics which are defined on a per-plugin basis, and +usually contain tags which differentiate each instance of a particular type of +plugin. + +- internal\_\ + - individual plugin-specific fields, such as requests counts. + +### Tags: + +All measurements for specific plugins are tagged with information relevant +to each particular plugin. + +### Example Output: + +``` +internal_memstats,host=tyrion alloc_bytes=4457408i,sys_bytes=10590456i,pointer_lookups=7i,mallocs=17642i,frees=7473i,heap_sys_bytes=6848512i,heap_idle_bytes=1368064i,heap_in_use_bytes=5480448i,heap_released_bytes=0i,total_alloc_bytes=6875560i,heap_alloc_bytes=4457408i,heap_objects_bytes=10169i,num_gc=2i 1480682800000000000 +internal_agent,host=tyrion metrics_written=18i,metrics_dropped=0i,metrics_gathered=19i,gather_errors=0i 1480682800000000000 +internal_write,output=file,host=tyrion buffer_limit=10000i,write_time_ns=636609i,metrics_written=18i,buffer_size=0i 1480682800000000000 +internal_gather,input=internal,host=tyrion metrics_gathered=19i,gather_time_ns=442114i 1480682800000000000 +internal_gather,input=http_listener,host=tyrion metrics_gathered=0i,gather_time_ns=167285i 1480682800000000000 +internal_http_listener,address=:8186,host=tyrion queries_received=0i,writes_received=0i,requests_received=0i,buffers_created=0i,requests_served=0i,pings_received=0i,bytes_received=0i,not_founds_served=0i,pings_served=0i,queries_served=0i,writes_served=0i 1480682800000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/internal/internal.go b/plugins/inputs/internal/internal.go new file mode 100644 index 000000000..f6123edd5 --- /dev/null +++ b/plugins/inputs/internal/internal.go @@ -0,0 +1,66 @@ +package internal + +import ( + "runtime" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/selfstat" +) + +type Self struct { + CollectMemstats bool +} + +func NewSelf() telegraf.Input { + return &Self{ + CollectMemstats: true, + } +} + +var sampleConfig = ` + ## If true, collect telegraf memory stats. + # collect_memstats = true +` + +func (s *Self) Description() string { + return "Collect statistics about itself" +} + +func (s *Self) SampleConfig() string { + return sampleConfig +} + +func (s *Self) Gather(acc telegraf.Accumulator) error { + if s.CollectMemstats { + m := &runtime.MemStats{} + runtime.ReadMemStats(m) + fields := map[string]interface{}{ + "alloc_bytes": m.Alloc, // bytes allocated and not yet freed + "total_alloc_bytes": m.TotalAlloc, // bytes allocated (even if freed) + "sys_bytes": m.Sys, // bytes obtained from system (sum of XxxSys below) + "pointer_lookups": m.Lookups, // number of pointer lookups + "mallocs": m.Mallocs, // number of mallocs + "frees": m.Frees, // number of frees + // Main allocation heap statistics. + "heap_alloc_bytes": m.HeapAlloc, // bytes allocated and not yet freed (same as Alloc above) + "heap_sys_bytes": m.HeapSys, // bytes obtained from system + "heap_idle_bytes": m.HeapIdle, // bytes in idle spans + "heap_in_use_bytes": m.HeapInuse, // bytes in non-idle span + "heap_released_bytes": m.HeapReleased, // bytes released to the OS + "heap_objects_bytes": m.HeapObjects, // total number of allocated objects + "num_gc": m.NumGC, + } + acc.AddFields("internal_memstats", fields, map[string]string{}) + } + + for _, m := range selfstat.Metrics() { + acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + + return nil +} + +func init() { + inputs.Add("internal", NewSelf) +} diff --git a/plugins/inputs/internal/internal_test.go b/plugins/inputs/internal/internal_test.go new file mode 100644 index 000000000..b17c53038 --- /dev/null +++ b/plugins/inputs/internal/internal_test.go @@ -0,0 +1,62 @@ +package internal + +import ( + "testing" + + "github.com/influxdata/telegraf/selfstat" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +func TestSelfPlugin(t *testing.T) { + s := NewSelf() + acc := &testutil.Accumulator{} + + s.Gather(acc) + assert.True(t, acc.HasMeasurement("internal_memstats")) + + // test that a registered stat is incremented + stat := selfstat.Register("mytest", "test", map[string]string{"test": "foo"}) + stat.Incr(1) + stat.Incr(2) + s.Gather(acc) + acc.AssertContainsTaggedFields(t, "internal_mytest", + map[string]interface{}{ + "test": int64(3), + }, + map[string]string{ + "test": "foo", + }, + ) + acc.ClearMetrics() + + // test that a registered stat is set properly + stat.Set(101) + s.Gather(acc) + acc.AssertContainsTaggedFields(t, "internal_mytest", + map[string]interface{}{ + "test": int64(101), + }, + map[string]string{ + "test": "foo", + }, + ) + acc.ClearMetrics() + + // test that regular and timing stats can share the same measurement, and + // that timings are set properly. + timing := selfstat.RegisterTiming("mytest", "test_ns", map[string]string{"test": "foo"}) + timing.Incr(100) + timing.Incr(200) + s.Gather(acc) + acc.AssertContainsTaggedFields(t, "internal_mytest", + map[string]interface{}{ + "test": int64(101), + "test_ns": int64(150), + }, + map[string]string{ + "test": "foo", + }, + ) +} diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index 41c8fd481..3ce4d87b4 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) type TcpListener struct { @@ -41,6 +42,12 @@ type TcpListener struct { parser parsers.Parser acc telegraf.Accumulator + + MaxConnections selfstat.Stat + CurrentConnections selfstat.Stat + TotalConnections selfstat.Stat + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } var dropwarn = "E! Error: tcp_listener message queue full. " + @@ -91,6 +98,16 @@ func (t *TcpListener) Start(acc telegraf.Accumulator) error { t.Lock() defer t.Unlock() + tags := map[string]string{ + "address": t.ServiceAddress, + } + t.MaxConnections = selfstat.Register("tcp_listener", "max_connections", tags) + t.MaxConnections.Set(int64(t.MaxTCPConnections)) + t.CurrentConnections = selfstat.Register("tcp_listener", "current_connections", tags) + t.TotalConnections = selfstat.Register("tcp_listener", "total_connections", tags) + t.PacketsRecv = selfstat.Register("tcp_listener", "packets_received", tags) + t.BytesRecv = selfstat.Register("tcp_listener", "bytes_received", tags) + t.acc = acc t.in = make(chan []byte, t.AllowedPendingMessages) t.done = make(chan struct{}) @@ -189,6 +206,8 @@ func (t *TcpListener) refuser(conn *net.TCPConn) { // handler handles a single TCP Connection func (t *TcpListener) handler(conn *net.TCPConn, id string) { + t.CurrentConnections.Incr(1) + t.TotalConnections.Incr(1) // connection cleanup function defer func() { t.wg.Done() @@ -196,6 +215,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { // Add one connection potential back to channel when this one closes t.accept <- true t.forget(id) + t.CurrentConnections.Incr(-1) }() var n int @@ -212,6 +232,8 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { if n == 0 { continue } + t.BytesRecv.Incr(int64(n)) + t.PacketsRecv.Incr(1) bufCopy := make([]byte, n+1) copy(bufCopy, scanner.Bytes()) bufCopy[n] = '\n' diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 78687feee..518a3fe48 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) // UdpListener main struct for the collector @@ -48,6 +49,9 @@ type UdpListener struct { acc telegraf.Accumulator listener *net.UDPConn + + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } // UDP_MAX_PACKET_SIZE is packet limit, see @@ -102,6 +106,12 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { u.Lock() defer u.Unlock() + tags := map[string]string{ + "address": u.ServiceAddress, + } + u.PacketsRecv = selfstat.Register("udp_listener", "packets_received", tags) + u.BytesRecv = selfstat.Register("udp_listener", "bytes_received", tags) + u.acc = acc u.in = make(chan []byte, u.AllowedPendingMessages) u.done = make(chan struct{}) @@ -162,6 +172,8 @@ func (u *UdpListener) udpListen() error { } continue } + u.BytesRecv.Incr(int64(n)) + u.PacketsRecv.Incr(1) bufCopy := make([]byte, n) copy(bufCopy, buf[:n]) diff --git a/selfstat/selfstat.go b/selfstat/selfstat.go new file mode 100644 index 000000000..98ecbb4d4 --- /dev/null +++ b/selfstat/selfstat.go @@ -0,0 +1,169 @@ +// selfstat is a package for tracking and collecting internal statistics +// about telegraf. Metrics can be registered using this package, and then +// incremented or set within your code. If the inputs.internal plugin is enabled, +// then all registered stats will be collected as they would by any other input +// plugin. +package selfstat + +import ( + "hash/fnv" + "log" + "sort" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +var ( + registry *rgstry +) + +// Stat is an interface for dealing with telegraf statistics collected +// on itself. +type Stat interface { + // Name is the name of the measurement + Name() string + + // FieldName is the name of the measurement field + FieldName() string + + // Tags is a tag map. Each time this is called a new map is allocated. + Tags() map[string]string + + // Key is the unique measurement+tags key of the stat. + Key() uint64 + + // Incr increments a regular stat by 'v'. + // in the case of a timing stat, increment adds the timing to the cache. + Incr(v int64) + + // Set sets a regular stat to 'v'. + // in the case of a timing stat, set adds the timing to the cache. + Set(v int64) + + // Get gets the value of the stat. In the case of timings, this returns + // an average value of all timings received since the last call to Get(). + // If no timings were received, it returns the previous value. + Get() int64 +} + +// Register registers the given measurement, field, and tags in the selfstat +// registry. If given an identical measurement, it will return the stat that's +// already been registered. +// +// The returned Stat can be incremented by the consumer of Register(), and it's +// value will be returned as a telegraf metric when Metrics() is called. +func Register(measurement, field string, tags map[string]string) Stat { + return registry.register(&stat{ + measurement: "internal_" + measurement, + field: field, + tags: tags, + }) +} + +// RegisterTiming registers the given measurement, field, and tags in the selfstat +// registry. If given an identical measurement, it will return the stat that's +// already been registered. +// +// Timing stats differ from regular stats in that they accumulate multiple +// "timings" added to them, and will return the average when Get() is called. +// After Get() is called, the average is cleared and the next timing returned +// from Get() will only reflect timings added since the previous call to Get(). +// If Get() is called without receiving any new timings, then the previous value +// is used. +// +// In other words, timings are an averaged metric that get cleared on each call +// to Get(). +// +// The returned Stat can be incremented by the consumer of Register(), and it's +// value will be returned as a telegraf metric when Metrics() is called. +func RegisterTiming(measurement, field string, tags map[string]string) Stat { + return registry.register(&timingStat{ + measurement: "internal_" + measurement, + field: field, + tags: tags, + }) +} + +// Metrics returns all registered stats as telegraf metrics. +func Metrics() []telegraf.Metric { + registry.mu.Lock() + now := time.Now() + metrics := make([]telegraf.Metric, len(registry.stats)) + i := 0 + for _, stats := range registry.stats { + if len(stats) > 0 { + var tags map[string]string + var name string + fields := map[string]interface{}{} + j := 0 + for fieldname, stat := range stats { + if j == 0 { + tags = stat.Tags() + name = stat.Name() + } + fields[fieldname] = stat.Get() + j++ + } + metric, err := metric.New(name, tags, fields, now) + if err != nil { + log.Printf("E! Error creating selfstat metric: %s", err) + continue + } + metrics[i] = metric + i++ + } + } + registry.mu.Unlock() + return metrics +} + +type rgstry struct { + stats map[uint64]map[string]Stat + mu sync.Mutex +} + +func (r *rgstry) register(s Stat) Stat { + r.mu.Lock() + defer r.mu.Unlock() + if stats, ok := r.stats[s.Key()]; ok { + // measurement exists + if stat, ok := stats[s.FieldName()]; ok { + // field already exists, so don't create a new one + return stat + } + r.stats[s.Key()][s.FieldName()] = s + return s + } else { + // creating a new unique metric + r.stats[s.Key()] = map[string]Stat{s.FieldName(): s} + return s + } +} + +func key(measurement string, tags map[string]string) uint64 { + h := fnv.New64a() + h.Write([]byte(measurement)) + + tmp := make([]string, len(tags)) + i := 0 + for k, v := range tags { + tmp[i] = k + v + i++ + } + sort.Strings(tmp) + + for _, s := range tmp { + h.Write([]byte(s)) + } + + return h.Sum64() +} + +func init() { + registry = &rgstry{ + stats: make(map[uint64]map[string]Stat), + } +} diff --git a/selfstat/selfstat_test.go b/selfstat/selfstat_test.go new file mode 100644 index 000000000..2de2bd381 --- /dev/null +++ b/selfstat/selfstat_test.go @@ -0,0 +1,221 @@ +package selfstat + +import ( + "sync" + "testing" + + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +var ( + // only allow one test at a time + // this is because we are dealing with a global registry + testLock sync.Mutex + a int64 +) + +// testCleanup resets the global registry for test cleanup & unlocks the test lock +func testCleanup() { + registry = &rgstry{ + stats: make(map[uint64]map[string]Stat), + } + testLock.Unlock() +} + +func BenchmarkStats(b *testing.B) { + testLock.Lock() + defer testCleanup() + b1 := Register("benchmark1", "test_field1", map[string]string{"test": "foo"}) + for n := 0; n < b.N; n++ { + b1.Incr(1) + b1.Incr(3) + a = b1.Get() + } +} + +func BenchmarkTimingStats(b *testing.B) { + testLock.Lock() + defer testCleanup() + b2 := RegisterTiming("benchmark2", "test_field1", map[string]string{"test": "foo"}) + for n := 0; n < b.N; n++ { + b2.Incr(1) + b2.Incr(3) + a = b2.Get() + } +} + +func TestRegisterAndIncrAndSet(t *testing.T) { + testLock.Lock() + defer testCleanup() + s1 := Register("test", "test_field1", map[string]string{"test": "foo"}) + s2 := Register("test", "test_field2", map[string]string{"test": "foo"}) + assert.Equal(t, int64(0), s1.Get()) + + s1.Incr(10) + s1.Incr(5) + assert.Equal(t, int64(15), s1.Get()) + + s1.Set(12) + assert.Equal(t, int64(12), s1.Get()) + + s1.Incr(-2) + assert.Equal(t, int64(10), s1.Get()) + + s2.Set(101) + assert.Equal(t, int64(101), s2.Get()) + + // make sure that the same field returns the same metric + // this one should be the same as s2. + foo := Register("test", "test_field2", map[string]string{"test": "foo"}) + assert.Equal(t, int64(101), foo.Get()) + + // check that tags are consistent + assert.Equal(t, map[string]string{"test": "foo"}, foo.Tags()) + assert.Equal(t, "internal_test", foo.Name()) +} + +func TestRegisterTimingAndIncrAndSet(t *testing.T) { + testLock.Lock() + defer testCleanup() + s1 := RegisterTiming("test", "test_field1_ns", map[string]string{"test": "foo"}) + s2 := RegisterTiming("test", "test_field2_ns", map[string]string{"test": "foo"}) + assert.Equal(t, int64(0), s1.Get()) + + s1.Incr(10) + s1.Incr(5) + assert.Equal(t, int64(7), s1.Get()) + // previous value is used on subsequent calls to Get() + assert.Equal(t, int64(7), s1.Get()) + + s1.Set(12) + assert.Equal(t, int64(12), s1.Get()) + + s1.Incr(-2) + assert.Equal(t, int64(-2), s1.Get()) + + s2.Set(101) + assert.Equal(t, int64(101), s2.Get()) + + // make sure that the same field returns the same metric + // this one should be the same as s2. + foo := RegisterTiming("test", "test_field2_ns", map[string]string{"test": "foo"}) + assert.Equal(t, int64(101), foo.Get()) + + // check that tags are consistent + assert.Equal(t, map[string]string{"test": "foo"}, foo.Tags()) + assert.Equal(t, "internal_test", foo.Name()) +} + +func TestStatKeyConsistency(t *testing.T) { + s := &stat{ + measurement: "internal_stat", + field: "myfield", + tags: map[string]string{ + "foo": "bar", + "bar": "baz", + "whose": "first", + }, + } + k := s.Key() + for i := 0; i < 5000; i++ { + // assert that the Key() func doesn't change anything. + assert.Equal(t, k, s.Key()) + + // assert that two identical measurements always produce the same key. + tmp := &stat{ + measurement: "internal_stat", + field: "myfield", + tags: map[string]string{ + "foo": "bar", + "bar": "baz", + "whose": "first", + }, + } + assert.Equal(t, k, tmp.Key()) + } +} + +func TestRegisterMetricsAndVerify(t *testing.T) { + testLock.Lock() + defer testCleanup() + + // register two metrics with the same key + s1 := RegisterTiming("test_timing", "test_field1_ns", map[string]string{"test": "foo"}) + s2 := RegisterTiming("test_timing", "test_field2_ns", map[string]string{"test": "foo"}) + s1.Incr(10) + s2.Incr(15) + assert.Len(t, Metrics(), 1) + + // register two more metrics with different keys + s3 := RegisterTiming("test_timing", "test_field1_ns", map[string]string{"test": "bar"}) + s4 := RegisterTiming("test_timing", "test_field2_ns", map[string]string{"test": "baz"}) + s3.Incr(10) + s4.Incr(15) + assert.Len(t, Metrics(), 3) + + // register some non-timing metrics + s5 := Register("test", "test_field1", map[string]string{"test": "bar"}) + s6 := Register("test", "test_field2", map[string]string{"test": "baz"}) + Register("test", "test_field3", map[string]string{"test": "baz"}) + s5.Incr(10) + s5.Incr(18) + s6.Incr(15) + assert.Len(t, Metrics(), 5) + + acc := testutil.Accumulator{} + acc.AddMetrics(Metrics()) + + // verify s1 & s2 + acc.AssertContainsTaggedFields(t, "internal_test_timing", + map[string]interface{}{ + "test_field1_ns": int64(10), + "test_field2_ns": int64(15), + }, + map[string]string{ + "test": "foo", + }, + ) + + // verify s3 + acc.AssertContainsTaggedFields(t, "internal_test_timing", + map[string]interface{}{ + "test_field1_ns": int64(10), + }, + map[string]string{ + "test": "bar", + }, + ) + + // verify s4 + acc.AssertContainsTaggedFields(t, "internal_test_timing", + map[string]interface{}{ + "test_field2_ns": int64(15), + }, + map[string]string{ + "test": "baz", + }, + ) + + // verify s5 + acc.AssertContainsTaggedFields(t, "internal_test", + map[string]interface{}{ + "test_field1": int64(28), + }, + map[string]string{ + "test": "bar", + }, + ) + + // verify s6 & s7 + acc.AssertContainsTaggedFields(t, "internal_test", + map[string]interface{}{ + "test_field2": int64(15), + "test_field3": int64(0), + }, + map[string]string{ + "test": "baz", + }, + ) +} diff --git a/selfstat/stat.go b/selfstat/stat.go new file mode 100644 index 000000000..d7ec60a2b --- /dev/null +++ b/selfstat/stat.go @@ -0,0 +1,50 @@ +package selfstat + +import ( + "sync/atomic" +) + +type stat struct { + v int64 + measurement string + field string + tags map[string]string + key uint64 +} + +func (s *stat) Incr(v int64) { + atomic.AddInt64(&s.v, v) +} + +func (s *stat) Set(v int64) { + atomic.StoreInt64(&s.v, v) +} + +func (s *stat) Get() int64 { + return atomic.LoadInt64(&s.v) +} + +func (s *stat) Name() string { + return s.measurement +} + +func (s *stat) FieldName() string { + return s.field +} + +// Tags returns a copy of the stat's tags. +// NOTE this allocates a new map every time it is called. +func (s *stat) Tags() map[string]string { + m := make(map[string]string, len(s.tags)) + for k, v := range s.tags { + m[k] = v + } + return m +} + +func (s *stat) Key() uint64 { + if s.key == 0 { + s.key = key(s.measurement, s.tags) + } + return s.key +} diff --git a/selfstat/timingStat.go b/selfstat/timingStat.go new file mode 100644 index 000000000..ef0ee05aa --- /dev/null +++ b/selfstat/timingStat.go @@ -0,0 +1,66 @@ +package selfstat + +import ( + "sync" +) + +type timingStat struct { + measurement string + field string + tags map[string]string + key uint64 + v int64 + prev int64 + count int64 + mu sync.Mutex +} + +func (s *timingStat) Incr(v int64) { + s.mu.Lock() + s.v += v + s.count++ + s.mu.Unlock() +} + +func (s *timingStat) Set(v int64) { + s.Incr(v) +} + +func (s *timingStat) Get() int64 { + var avg int64 + s.mu.Lock() + if s.count > 0 { + s.prev, avg = s.v/s.count, s.v/s.count + s.v = 0 + s.count = 0 + } else { + avg = s.prev + } + s.mu.Unlock() + return avg +} + +func (s *timingStat) Name() string { + return s.measurement +} + +func (s *timingStat) FieldName() string { + return s.field +} + +// Tags returns a copy of the timingStat's tags. +// NOTE this allocates a new map every time it is called. +func (s *timingStat) Tags() map[string]string { + m := make(map[string]string, len(s.tags)) + for k, v := range s.tags { + m[k] = v + } + return m +} + +func (s *timingStat) Key() uint64 { + if s.key == 0 { + s.key = key(s.measurement, s.tags) + } + return s.key +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 11cea2434..2efee5572 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/assert" ) @@ -110,6 +112,12 @@ func (a *Accumulator) AddGauge( a.AddFields(measurement, fields, tags, timestamp...) } +func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { + for _, m := range metrics { + a.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } +} + // AddError appends the given error to Accumulator.Errors. func (a *Accumulator) AddError(err error) { if err == nil {