Exclude alias tag if unset from plugin internal stats (#6571)
This commit is contained in:
parent
5a6fe149f6
commit
2397c53d7d
|
@ -24,10 +24,14 @@ type RunningAggregator struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConfig) *RunningAggregator {
|
func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConfig) *RunningAggregator {
|
||||||
|
tags := map[string]string{"aggregator": config.Name}
|
||||||
|
if config.Alias != "" {
|
||||||
|
tags["alias"] = config.Alias
|
||||||
|
}
|
||||||
|
|
||||||
logger := &Logger{
|
logger := &Logger{
|
||||||
Name: logName("aggregators", config.Name, config.Alias),
|
Name: logName("aggregators", config.Name, config.Alias),
|
||||||
Errs: selfstat.Register("aggregate", "errors",
|
Errs: selfstat.Register("aggregate", "errors", tags),
|
||||||
map[string]string{"input": config.Name, "alias": config.Alias}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
setLogIfExist(aggregator, logger)
|
setLogIfExist(aggregator, logger)
|
||||||
|
@ -38,22 +42,22 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
|
||||||
MetricsPushed: selfstat.Register(
|
MetricsPushed: selfstat.Register(
|
||||||
"aggregate",
|
"aggregate",
|
||||||
"metrics_pushed",
|
"metrics_pushed",
|
||||||
map[string]string{"aggregator": config.Name, "alias": config.Alias},
|
tags,
|
||||||
),
|
),
|
||||||
MetricsFiltered: selfstat.Register(
|
MetricsFiltered: selfstat.Register(
|
||||||
"aggregate",
|
"aggregate",
|
||||||
"metrics_filtered",
|
"metrics_filtered",
|
||||||
map[string]string{"aggregator": config.Name, "alias": config.Alias},
|
tags,
|
||||||
),
|
),
|
||||||
MetricsDropped: selfstat.Register(
|
MetricsDropped: selfstat.Register(
|
||||||
"aggregate",
|
"aggregate",
|
||||||
"metrics_dropped",
|
"metrics_dropped",
|
||||||
map[string]string{"aggregator": config.Name, "alias": config.Alias},
|
tags,
|
||||||
),
|
),
|
||||||
PushTime: selfstat.Register(
|
PushTime: selfstat.Register(
|
||||||
"aggregate",
|
"aggregate",
|
||||||
"push_time_ns",
|
"push_time_ns",
|
||||||
map[string]string{"aggregator": config.Name, "alias": config.Alias},
|
tags,
|
||||||
),
|
),
|
||||||
log: logger,
|
log: logger,
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,12 +21,15 @@ type RunningInput struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
|
func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
|
||||||
logger := &Logger{
|
tags := map[string]string{"input": config.Name}
|
||||||
Name: logName("inputs", config.Name, config.Alias),
|
if config.Alias != "" {
|
||||||
Errs: selfstat.Register("gather", "errors",
|
tags["alias"] = config.Alias
|
||||||
map[string]string{"input": config.Name, "alias": config.Alias}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := &Logger{
|
||||||
|
Name: logName("inputs", config.Name, config.Alias),
|
||||||
|
Errs: selfstat.Register("gather", "errors", tags),
|
||||||
|
}
|
||||||
setLogIfExist(input, logger)
|
setLogIfExist(input, logger)
|
||||||
|
|
||||||
return &RunningInput{
|
return &RunningInput{
|
||||||
|
@ -35,12 +38,12 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
|
||||||
MetricsGathered: selfstat.Register(
|
MetricsGathered: selfstat.Register(
|
||||||
"gather",
|
"gather",
|
||||||
"metrics_gathered",
|
"metrics_gathered",
|
||||||
map[string]string{"input": config.Name, "alias": config.Alias},
|
tags,
|
||||||
),
|
),
|
||||||
GatherTime: selfstat.RegisterTiming(
|
GatherTime: selfstat.RegisterTiming(
|
||||||
"gather",
|
"gather",
|
||||||
"gather_time_ns",
|
"gather_time_ns",
|
||||||
map[string]string{"input": config.Name, "alias": config.Alias},
|
tags,
|
||||||
),
|
),
|
||||||
log: logger,
|
log: logger,
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,12 +57,15 @@ func NewRunningOutput(
|
||||||
batchSize int,
|
batchSize int,
|
||||||
bufferLimit int,
|
bufferLimit int,
|
||||||
) *RunningOutput {
|
) *RunningOutput {
|
||||||
logger := &Logger{
|
tags := map[string]string{"output": config.Name}
|
||||||
Name: logName("outputs", config.Name, config.Alias),
|
if config.Alias != "" {
|
||||||
Errs: selfstat.Register("write", "errors",
|
tags["alias"] = config.Alias
|
||||||
map[string]string{"output": config.Name, "alias": config.Alias}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := &Logger{
|
||||||
|
Name: logName("outputs", config.Name, config.Alias),
|
||||||
|
Errs: selfstat.Register("write", "errors", tags),
|
||||||
|
}
|
||||||
setLogIfExist(output, logger)
|
setLogIfExist(output, logger)
|
||||||
|
|
||||||
if config.MetricBufferLimit > 0 {
|
if config.MetricBufferLimit > 0 {
|
||||||
|
@ -88,12 +91,12 @@ func NewRunningOutput(
|
||||||
MetricsFiltered: selfstat.Register(
|
MetricsFiltered: selfstat.Register(
|
||||||
"write",
|
"write",
|
||||||
"metrics_filtered",
|
"metrics_filtered",
|
||||||
map[string]string{"output": config.Name, "alias": config.Alias},
|
tags,
|
||||||
),
|
),
|
||||||
WriteTime: selfstat.RegisterTiming(
|
WriteTime: selfstat.RegisterTiming(
|
||||||
"write",
|
"write",
|
||||||
"write_time_ns",
|
"write_time_ns",
|
||||||
map[string]string{"output": config.Name, "alias": config.Alias},
|
tags,
|
||||||
),
|
),
|
||||||
log: logger,
|
log: logger,
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,12 +29,15 @@ type ProcessorConfig struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) *RunningProcessor {
|
func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) *RunningProcessor {
|
||||||
logger := &Logger{
|
tags := map[string]string{"processor": config.Name}
|
||||||
Name: logName("processors", config.Name, config.Alias),
|
if config.Alias != "" {
|
||||||
Errs: selfstat.Register("process", "errors",
|
tags["alias"] = config.Alias
|
||||||
map[string]string{"input": config.Name, "alias": config.Alias}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := &Logger{
|
||||||
|
Name: logName("processors", config.Name, config.Alias),
|
||||||
|
Errs: selfstat.Register("process", "errors", tags),
|
||||||
|
}
|
||||||
setLogIfExist(processor, logger)
|
setLogIfExist(processor, logger)
|
||||||
|
|
||||||
return &RunningProcessor{
|
return &RunningProcessor{
|
||||||
|
|
|
@ -32,9 +32,6 @@ type Stat interface {
|
||||||
// Tags is a tag map. Each time this is called a new map is allocated.
|
// Tags is a tag map. Each time this is called a new map is allocated.
|
||||||
Tags() map[string]string
|
Tags() map[string]string
|
||||||
|
|
||||||
// Key is the unique measurement+tags key of the stat.
|
|
||||||
Key() uint64
|
|
||||||
|
|
||||||
// Incr increments a regular stat by 'v'.
|
// Incr increments a regular stat by 'v'.
|
||||||
// in the case of a timing stat, increment adds the timing to the cache.
|
// in the case of a timing stat, increment adds the timing to the cache.
|
||||||
Incr(v int64)
|
Incr(v int64)
|
||||||
|
@ -56,11 +53,7 @@ type Stat interface {
|
||||||
// The returned Stat can be incremented by the consumer of Register(), and it's
|
// The returned Stat can be incremented by the consumer of Register(), and it's
|
||||||
// value will be returned as a telegraf metric when Metrics() is called.
|
// value will be returned as a telegraf metric when Metrics() is called.
|
||||||
func Register(measurement, field string, tags map[string]string) Stat {
|
func Register(measurement, field string, tags map[string]string) Stat {
|
||||||
return registry.register(&stat{
|
return registry.register("internal_"+measurement, field, tags)
|
||||||
measurement: "internal_" + measurement,
|
|
||||||
field: field,
|
|
||||||
tags: tags,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterTiming registers the given measurement, field, and tags in the selfstat
|
// RegisterTiming registers the given measurement, field, and tags in the selfstat
|
||||||
|
@ -80,11 +73,7 @@ func Register(measurement, field string, tags map[string]string) Stat {
|
||||||
// The returned Stat can be incremented by the consumer of Register(), and it's
|
// The returned Stat can be incremented by the consumer of Register(), and it's
|
||||||
// value will be returned as a telegraf metric when Metrics() is called.
|
// value will be returned as a telegraf metric when Metrics() is called.
|
||||||
func RegisterTiming(measurement, field string, tags map[string]string) Stat {
|
func RegisterTiming(measurement, field string, tags map[string]string) Stat {
|
||||||
return registry.register(&timingStat{
|
return registry.registerTiming("internal_"+measurement, field, tags)
|
||||||
measurement: "internal_" + measurement,
|
|
||||||
field: field,
|
|
||||||
tags: tags,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metrics returns all registered stats as telegraf metrics.
|
// Metrics returns all registered stats as telegraf metrics.
|
||||||
|
@ -125,22 +114,71 @@ type rgstry struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rgstry) register(s Stat) Stat {
|
func (r *rgstry) register(measurement, field string, tags map[string]string) Stat {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
defer r.mu.Unlock()
|
defer r.mu.Unlock()
|
||||||
if stats, ok := r.stats[s.Key()]; ok {
|
|
||||||
// measurement exists
|
key := key(measurement, tags)
|
||||||
if stat, ok := stats[s.FieldName()]; ok {
|
if stat, ok := registry.get(key, field); ok {
|
||||||
// field already exists, so don't create a new one
|
return stat
|
||||||
return stat
|
|
||||||
}
|
|
||||||
r.stats[s.Key()][s.FieldName()] = s
|
|
||||||
return s
|
|
||||||
} else {
|
|
||||||
// creating a new unique metric
|
|
||||||
r.stats[s.Key()] = map[string]Stat{s.FieldName(): s}
|
|
||||||
return s
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t := make(map[string]string, len(tags))
|
||||||
|
for k, v := range tags {
|
||||||
|
t[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &stat{
|
||||||
|
measurement: measurement,
|
||||||
|
field: field,
|
||||||
|
tags: t,
|
||||||
|
}
|
||||||
|
registry.set(key, s)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rgstry) registerTiming(measurement, field string, tags map[string]string) Stat {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
key := key(measurement, tags)
|
||||||
|
if stat, ok := registry.get(key, field); ok {
|
||||||
|
return stat
|
||||||
|
}
|
||||||
|
|
||||||
|
t := make(map[string]string, len(tags))
|
||||||
|
for k, v := range tags {
|
||||||
|
t[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &timingStat{
|
||||||
|
measurement: measurement,
|
||||||
|
field: field,
|
||||||
|
tags: t,
|
||||||
|
}
|
||||||
|
registry.set(key, s)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rgstry) get(key uint64, field string) (Stat, bool) {
|
||||||
|
if _, ok := r.stats[key]; !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
if stat, ok := r.stats[key][field]; ok {
|
||||||
|
return stat, true
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rgstry) set(key uint64, s Stat) {
|
||||||
|
if _, ok := r.stats[key]; !ok {
|
||||||
|
r.stats[key] = make(map[string]Stat)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.stats[key][s.FieldName()] = s
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func key(measurement string, tags map[string]string) uint64 {
|
func key(measurement string, tags map[string]string) uint64 {
|
||||||
|
|
|
@ -5,8 +5,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -109,32 +109,17 @@ func TestRegisterTimingAndIncrAndSet(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStatKeyConsistency(t *testing.T) {
|
func TestStatKeyConsistency(t *testing.T) {
|
||||||
s := &stat{
|
lhs := key("internal_stats", map[string]string{
|
||||||
measurement: "internal_stat",
|
"foo": "bar",
|
||||||
field: "myfield",
|
"bar": "baz",
|
||||||
tags: map[string]string{
|
"whose": "first",
|
||||||
"foo": "bar",
|
})
|
||||||
"bar": "baz",
|
rhs := key("internal_stats", map[string]string{
|
||||||
"whose": "first",
|
"foo": "bar",
|
||||||
},
|
"bar": "baz",
|
||||||
}
|
"whose": "first",
|
||||||
k := s.Key()
|
})
|
||||||
for i := 0; i < 5000; i++ {
|
require.Equal(t, lhs, rhs)
|
||||||
// assert that the Key() func doesn't change anything.
|
|
||||||
assert.Equal(t, k, s.Key())
|
|
||||||
|
|
||||||
// assert that two identical measurements always produce the same key.
|
|
||||||
tmp := &stat{
|
|
||||||
measurement: "internal_stat",
|
|
||||||
field: "myfield",
|
|
||||||
tags: map[string]string{
|
|
||||||
"foo": "bar",
|
|
||||||
"bar": "baz",
|
|
||||||
"whose": "first",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
assert.Equal(t, k, tmp.Key())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRegisterMetricsAndVerify(t *testing.T) {
|
func TestRegisterMetricsAndVerify(t *testing.T) {
|
||||||
|
@ -219,3 +204,10 @@ func TestRegisterMetricsAndVerify(t *testing.T) {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRegisterCopy(t *testing.T) {
|
||||||
|
tags := map[string]string{"input": "mem", "alias": "mem1"}
|
||||||
|
stat := Register("gather", "metrics_gathered", tags)
|
||||||
|
tags["new"] = "value"
|
||||||
|
require.NotEqual(t, tags, stat.Tags())
|
||||||
|
}
|
||||||
|
|
|
@ -41,10 +41,3 @@ func (s *stat) Tags() map[string]string {
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stat) Key() uint64 {
|
|
||||||
if s.key == 0 {
|
|
||||||
s.key = key(s.measurement, s.tags)
|
|
||||||
}
|
|
||||||
return s.key
|
|
||||||
}
|
|
||||||
|
|
|
@ -57,10 +57,3 @@ func (s *timingStat) Tags() map[string]string {
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *timingStat) Key() uint64 {
|
|
||||||
if s.key == 0 {
|
|
||||||
s.key = key(s.measurement, s.tags)
|
|
||||||
}
|
|
||||||
return s.key
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue