From 2397c53d7db18443874079d5aecdac9e7038d06d Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 23 Oct 2019 12:40:31 -0700 Subject: [PATCH] Exclude alias tag if unset from plugin internal stats (#6571) --- internal/models/running_aggregator.go | 16 +++-- internal/models/running_input.go | 15 +++-- internal/models/running_output.go | 15 +++-- internal/models/running_processor.go | 11 ++-- selfstat/selfstat.go | 90 +++++++++++++++++++-------- selfstat/selfstat_test.go | 46 ++++++-------- selfstat/stat.go | 7 --- selfstat/timingStat.go | 7 --- 8 files changed, 118 insertions(+), 89 deletions(-) diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 91a10debb..b8957e30a 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -24,10 +24,14 @@ type RunningAggregator struct { } func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConfig) *RunningAggregator { + tags := map[string]string{"aggregator": config.Name} + if config.Alias != "" { + tags["alias"] = config.Alias + } + logger := &Logger{ Name: logName("aggregators", config.Name, config.Alias), - Errs: selfstat.Register("aggregate", "errors", - map[string]string{"input": config.Name, "alias": config.Alias}), + Errs: selfstat.Register("aggregate", "errors", tags), } setLogIfExist(aggregator, logger) @@ -38,22 +42,22 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf MetricsPushed: selfstat.Register( "aggregate", "metrics_pushed", - map[string]string{"aggregator": config.Name, "alias": config.Alias}, + tags, ), MetricsFiltered: selfstat.Register( "aggregate", "metrics_filtered", - map[string]string{"aggregator": config.Name, "alias": config.Alias}, + tags, ), MetricsDropped: selfstat.Register( "aggregate", "metrics_dropped", - map[string]string{"aggregator": config.Name, "alias": config.Alias}, + tags, ), PushTime: selfstat.Register( "aggregate", "push_time_ns", - map[string]string{"aggregator": config.Name, "alias": config.Alias}, + tags, ), log: logger, } diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 85f0afb81..c09fb1409 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -21,12 +21,15 @@ type RunningInput struct { } func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { - logger := &Logger{ - Name: logName("inputs", config.Name, config.Alias), - Errs: selfstat.Register("gather", "errors", - map[string]string{"input": config.Name, "alias": config.Alias}), + tags := map[string]string{"input": config.Name} + if config.Alias != "" { + tags["alias"] = config.Alias } + logger := &Logger{ + Name: logName("inputs", config.Name, config.Alias), + Errs: selfstat.Register("gather", "errors", tags), + } setLogIfExist(input, logger) return &RunningInput{ @@ -35,12 +38,12 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { MetricsGathered: selfstat.Register( "gather", "metrics_gathered", - map[string]string{"input": config.Name, "alias": config.Alias}, + tags, ), GatherTime: selfstat.RegisterTiming( "gather", "gather_time_ns", - map[string]string{"input": config.Name, "alias": config.Alias}, + tags, ), log: logger, } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 282c2d23b..32e9d5ceb 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -57,12 +57,15 @@ func NewRunningOutput( batchSize int, bufferLimit int, ) *RunningOutput { - logger := &Logger{ - Name: logName("outputs", config.Name, config.Alias), - Errs: selfstat.Register("write", "errors", - map[string]string{"output": config.Name, "alias": config.Alias}), + tags := map[string]string{"output": config.Name} + if config.Alias != "" { + tags["alias"] = config.Alias } + logger := &Logger{ + Name: logName("outputs", config.Name, config.Alias), + Errs: selfstat.Register("write", "errors", tags), + } setLogIfExist(output, logger) if config.MetricBufferLimit > 0 { @@ -88,12 +91,12 @@ func NewRunningOutput( MetricsFiltered: selfstat.Register( "write", "metrics_filtered", - map[string]string{"output": config.Name, "alias": config.Alias}, + tags, ), WriteTime: selfstat.RegisterTiming( "write", "write_time_ns", - map[string]string{"output": config.Name, "alias": config.Alias}, + tags, ), log: logger, } diff --git a/internal/models/running_processor.go b/internal/models/running_processor.go index 5a12716e5..22a7d0198 100644 --- a/internal/models/running_processor.go +++ b/internal/models/running_processor.go @@ -29,12 +29,15 @@ type ProcessorConfig struct { } func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) *RunningProcessor { - logger := &Logger{ - Name: logName("processors", config.Name, config.Alias), - Errs: selfstat.Register("process", "errors", - map[string]string{"input": config.Name, "alias": config.Alias}), + tags := map[string]string{"processor": config.Name} + if config.Alias != "" { + tags["alias"] = config.Alias } + logger := &Logger{ + Name: logName("processors", config.Name, config.Alias), + Errs: selfstat.Register("process", "errors", tags), + } setLogIfExist(processor, logger) return &RunningProcessor{ diff --git a/selfstat/selfstat.go b/selfstat/selfstat.go index 98ecbb4d4..821db1c94 100644 --- a/selfstat/selfstat.go +++ b/selfstat/selfstat.go @@ -32,9 +32,6 @@ type Stat interface { // Tags is a tag map. Each time this is called a new map is allocated. Tags() map[string]string - // Key is the unique measurement+tags key of the stat. - Key() uint64 - // Incr increments a regular stat by 'v'. // in the case of a timing stat, increment adds the timing to the cache. Incr(v int64) @@ -56,11 +53,7 @@ type Stat interface { // The returned Stat can be incremented by the consumer of Register(), and it's // value will be returned as a telegraf metric when Metrics() is called. func Register(measurement, field string, tags map[string]string) Stat { - return registry.register(&stat{ - measurement: "internal_" + measurement, - field: field, - tags: tags, - }) + return registry.register("internal_"+measurement, field, tags) } // RegisterTiming registers the given measurement, field, and tags in the selfstat @@ -80,11 +73,7 @@ func Register(measurement, field string, tags map[string]string) Stat { // The returned Stat can be incremented by the consumer of Register(), and it's // value will be returned as a telegraf metric when Metrics() is called. func RegisterTiming(measurement, field string, tags map[string]string) Stat { - return registry.register(&timingStat{ - measurement: "internal_" + measurement, - field: field, - tags: tags, - }) + return registry.registerTiming("internal_"+measurement, field, tags) } // Metrics returns all registered stats as telegraf metrics. @@ -125,22 +114,71 @@ type rgstry struct { mu sync.Mutex } -func (r *rgstry) register(s Stat) Stat { +func (r *rgstry) register(measurement, field string, tags map[string]string) Stat { r.mu.Lock() defer r.mu.Unlock() - if stats, ok := r.stats[s.Key()]; ok { - // measurement exists - if stat, ok := stats[s.FieldName()]; ok { - // field already exists, so don't create a new one - return stat - } - r.stats[s.Key()][s.FieldName()] = s - return s - } else { - // creating a new unique metric - r.stats[s.Key()] = map[string]Stat{s.FieldName(): s} - return s + + key := key(measurement, tags) + if stat, ok := registry.get(key, field); ok { + return stat } + + t := make(map[string]string, len(tags)) + for k, v := range tags { + t[k] = v + } + + s := &stat{ + measurement: measurement, + field: field, + tags: t, + } + registry.set(key, s) + return s +} + +func (r *rgstry) registerTiming(measurement, field string, tags map[string]string) Stat { + r.mu.Lock() + defer r.mu.Unlock() + + key := key(measurement, tags) + if stat, ok := registry.get(key, field); ok { + return stat + } + + t := make(map[string]string, len(tags)) + for k, v := range tags { + t[k] = v + } + + s := &timingStat{ + measurement: measurement, + field: field, + tags: t, + } + registry.set(key, s) + return s +} + +func (r *rgstry) get(key uint64, field string) (Stat, bool) { + if _, ok := r.stats[key]; !ok { + return nil, false + } + + if stat, ok := r.stats[key][field]; ok { + return stat, true + } + + return nil, false +} + +func (r *rgstry) set(key uint64, s Stat) { + if _, ok := r.stats[key]; !ok { + r.stats[key] = make(map[string]Stat) + } + + r.stats[key][s.FieldName()] = s + return } func key(measurement string, tags map[string]string) uint64 { diff --git a/selfstat/selfstat_test.go b/selfstat/selfstat_test.go index 2de2bd381..10ce32728 100644 --- a/selfstat/selfstat_test.go +++ b/selfstat/selfstat_test.go @@ -5,8 +5,8 @@ import ( "testing" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var ( @@ -109,32 +109,17 @@ func TestRegisterTimingAndIncrAndSet(t *testing.T) { } func TestStatKeyConsistency(t *testing.T) { - s := &stat{ - measurement: "internal_stat", - field: "myfield", - tags: map[string]string{ - "foo": "bar", - "bar": "baz", - "whose": "first", - }, - } - k := s.Key() - for i := 0; i < 5000; i++ { - // assert that the Key() func doesn't change anything. - assert.Equal(t, k, s.Key()) - - // assert that two identical measurements always produce the same key. - tmp := &stat{ - measurement: "internal_stat", - field: "myfield", - tags: map[string]string{ - "foo": "bar", - "bar": "baz", - "whose": "first", - }, - } - assert.Equal(t, k, tmp.Key()) - } + lhs := key("internal_stats", map[string]string{ + "foo": "bar", + "bar": "baz", + "whose": "first", + }) + rhs := key("internal_stats", map[string]string{ + "foo": "bar", + "bar": "baz", + "whose": "first", + }) + require.Equal(t, lhs, rhs) } func TestRegisterMetricsAndVerify(t *testing.T) { @@ -219,3 +204,10 @@ func TestRegisterMetricsAndVerify(t *testing.T) { }, ) } + +func TestRegisterCopy(t *testing.T) { + tags := map[string]string{"input": "mem", "alias": "mem1"} + stat := Register("gather", "metrics_gathered", tags) + tags["new"] = "value" + require.NotEqual(t, tags, stat.Tags()) +} diff --git a/selfstat/stat.go b/selfstat/stat.go index d7ec60a2b..e1905baf5 100644 --- a/selfstat/stat.go +++ b/selfstat/stat.go @@ -41,10 +41,3 @@ func (s *stat) Tags() map[string]string { } return m } - -func (s *stat) Key() uint64 { - if s.key == 0 { - s.key = key(s.measurement, s.tags) - } - return s.key -} diff --git a/selfstat/timingStat.go b/selfstat/timingStat.go index ef0ee05aa..13f8400bc 100644 --- a/selfstat/timingStat.go +++ b/selfstat/timingStat.go @@ -57,10 +57,3 @@ func (s *timingStat) Tags() map[string]string { } return m } - -func (s *timingStat) Key() uint64 { - if s.key == 0 { - s.key = key(s.measurement, s.tags) - } - return s.key -}