diff --git a/accumulator.go b/accumulator.go index bb6e4dc85..13fd6e571 100644 --- a/accumulator.go +++ b/accumulator.go @@ -2,9 +2,8 @@ package telegraf import "time" -// Accumulator is an interface for "accumulating" metrics from input plugin(s). -// The metrics are sent down a channel shared between all input plugins and then -// flushed on the configured flush_interval. +// Accumulator is an interface for "accumulating" metrics from plugin(s). +// The metrics are sent down a channel shared between all plugins. type Accumulator interface { // AddFields adds a metric to the accumulator with the given measurement // name, fields, and tags (and timestamp). If a timestamp is not provided, @@ -29,12 +28,7 @@ type Accumulator interface { tags map[string]string, t ...time.Time) - AddError(err error) - - Debug() bool - SetDebug(enabled bool) - SetPrecision(precision, interval time.Duration) - DisablePrecision() + AddError(err error) } diff --git a/agent/accumulator.go b/agent/accumulator.go index 752e2b91f..0a84cedd8 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -1,37 +1,40 @@ package agent import ( - "fmt" "log" - "math" "sync/atomic" "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/models" ) +type MetricMaker interface { + Name() string + MakeMetric( + measurement string, + fields map[string]interface{}, + tags map[string]string, + mType telegraf.ValueType, + t time.Time, + ) telegraf.Metric +} + func NewAccumulator( - inputConfig *models.InputConfig, + maker MetricMaker, metrics chan telegraf.Metric, ) *accumulator { - acc := accumulator{} - acc.metrics = metrics - acc.inputConfig = inputConfig - acc.precision = time.Nanosecond + acc := accumulator{ + maker: maker, + metrics: metrics, + precision: time.Nanosecond, + } return &acc } type accumulator struct { metrics chan telegraf.Metric - defaultTags map[string]string - - debug bool - // print every point added to the accumulator - trace bool - - inputConfig *models.InputConfig + maker MetricMaker precision time.Duration @@ -44,7 +47,7 @@ func (ac *accumulator) AddFields( tags map[string]string, t ...time.Time, ) { - if m := ac.makeMetric(measurement, fields, tags, telegraf.Untyped, t...); m != nil { + if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Untyped, ac.getTime(t)); m != nil { ac.metrics <- m } } @@ -55,7 +58,7 @@ func (ac *accumulator) AddGauge( tags map[string]string, t ...time.Time, ) { - if m := ac.makeMetric(measurement, fields, tags, telegraf.Gauge, t...); m != nil { + if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Gauge, ac.getTime(t)); m != nil { ac.metrics <- m } } @@ -66,113 +69,10 @@ func (ac *accumulator) AddCounter( tags map[string]string, t ...time.Time, ) { - if m := ac.makeMetric(measurement, fields, tags, telegraf.Counter, t...); m != nil { + if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Counter, ac.getTime(t)); m != nil { ac.metrics <- m } -} - -// makeMetric either returns a metric, or returns nil if the metric doesn't -// need to be created (because of filtering, an error, etc.) -func (ac *accumulator) makeMetric( - measurement string, - fields map[string]interface{}, - tags map[string]string, - mType telegraf.ValueType, - t ...time.Time, -) telegraf.Metric { - if len(fields) == 0 || len(measurement) == 0 { - return nil } - if tags == nil { - tags = make(map[string]string) - } - - // Override measurement name if set - if len(ac.inputConfig.NameOverride) != 0 { - measurement = ac.inputConfig.NameOverride - } - // Apply measurement prefix and suffix if set - if len(ac.inputConfig.MeasurementPrefix) != 0 { - measurement = ac.inputConfig.MeasurementPrefix + measurement - } - if len(ac.inputConfig.MeasurementSuffix) != 0 { - measurement = measurement + ac.inputConfig.MeasurementSuffix - } - - // Apply plugin-wide tags if set - for k, v := range ac.inputConfig.Tags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - // Apply daemon-wide tags if set - for k, v := range ac.defaultTags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - - // Apply the metric filter(s) - if ok := ac.inputConfig.Filter.Apply(measurement, fields, tags); !ok { - return nil - } - - for k, v := range fields { - // Validate uint64 and float64 fields - switch val := v.(type) { - case uint64: - // InfluxDB does not support writing uint64 - if val < uint64(9223372036854775808) { - fields[k] = int64(val) - } else { - fields[k] = int64(9223372036854775807) - } - continue - case float64: - // NaNs are invalid values in influxdb, skip measurement - if math.IsNaN(val) || math.IsInf(val, 0) { - if ac.debug { - log.Printf("I! Measurement [%s] field [%s] has a NaN or Inf "+ - "field, skipping", - measurement, k) - } - delete(fields, k) - continue - } - } - - fields[k] = v - } - - var timestamp time.Time - if len(t) > 0 { - timestamp = t[0] - } else { - timestamp = time.Now() - } - timestamp = timestamp.Round(ac.precision) - - var m telegraf.Metric - var err error - switch mType { - case telegraf.Counter: - m, err = telegraf.NewCounterMetric(measurement, tags, fields, timestamp) - case telegraf.Gauge: - m, err = telegraf.NewGaugeMetric(measurement, tags, fields, timestamp) - default: - m, err = telegraf.NewMetric(measurement, tags, fields, timestamp) - } - if err != nil { - log.Printf("E! Error adding point [%s]: %s\n", measurement, err.Error()) - return nil - } - - if ac.trace { - fmt.Println("> " + m.String()) - } - - return m -} // AddError passes a runtime error to the accumulator. // The error will be tagged with the plugin name and written to the log. @@ -182,23 +82,7 @@ func (ac *accumulator) AddError(err error) { } atomic.AddUint64(&ac.errCount, 1) //TODO suppress/throttle consecutive duplicate errors? - log.Printf("E! Error in input [%s]: %s", ac.inputConfig.Name, err) -} - -func (ac *accumulator) Debug() bool { - return ac.debug -} - -func (ac *accumulator) SetDebug(debug bool) { - ac.debug = debug -} - -func (ac *accumulator) Trace() bool { - return ac.trace -} - -func (ac *accumulator) SetTrace(trace bool) { - ac.trace = trace + log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err) } // SetPrecision takes two time.Duration objects. If the first is non-zero, @@ -222,17 +106,12 @@ func (ac *accumulator) SetPrecision(precision, interval time.Duration) { } } -func (ac *accumulator) DisablePrecision() { - ac.precision = time.Nanosecond +func (ac accumulator) getTime(t []time.Time) time.Time { + var timestamp time.Time + if len(t) > 0 { + timestamp = t[0] + } else { + timestamp = time.Now() + return timestamp.Round(ac.precision) } - -func (ac *accumulator) setDefaultTags(tags map[string]string) { - ac.defaultTags = tags -} - -func (ac *accumulator) addDefaultTag(key, value string) { - if ac.defaultTags == nil { - ac.defaultTags = make(map[string]string) - } - ac.defaultTags[key] = value } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index ef5a34ec9..ef8d9eb20 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -4,24 +4,21 @@ import ( "bytes" "fmt" "log" - "math" "os" "testing" "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestAdd(t *testing.T) { - a := accumulator{} now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) a.AddFields("acctest", map[string]interface{}{"value": float64(101)}, @@ -33,97 +30,142 @@ func TestAdd(t *testing.T) { map[string]interface{}{"value": float64(101)}, map[string]string{"acc": "test"}, now) - testm := <-a.metrics + testm := <-metrics actual := testm.String() assert.Contains(t, actual, "acctest value=101") - testm = <-a.metrics + testm = <-metrics actual = testm.String() assert.Contains(t, actual, "acctest,acc=test value=101") - testm = <-a.metrics + testm = <-metrics actual = testm.String() assert.Equal(t, fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), actual) } -func TestAddGauge(t *testing.T) { - a := accumulator{} +func TestAddFields(t *testing.T) { now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) - a.AddGauge("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{}) - a.AddGauge("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}) - a.AddGauge("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}, now) + fields := map[string]interface{}{ + "usage": float64(99), + } + a.AddFields("acctest", fields, map[string]string{}) + a.AddGauge("acctest", fields, map[string]string{"acc": "test"}) + a.AddCounter("acctest", fields, map[string]string{"acc": "test"}, now) - testm := <-a.metrics + testm := <-metrics actual := testm.String() - assert.Contains(t, actual, "acctest value=101") - assert.Equal(t, testm.Type(), telegraf.Gauge) + assert.Contains(t, actual, "acctest usage=99") - testm = <-a.metrics + testm = <-metrics actual = testm.String() - assert.Contains(t, actual, "acctest,acc=test value=101") - assert.Equal(t, testm.Type(), telegraf.Gauge) + assert.Contains(t, actual, "acctest,acc=test usage=99") - testm = <-a.metrics + testm = <-metrics actual = testm.String() assert.Equal(t, - fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), + fmt.Sprintf("acctest,acc=test usage=99 %d", now.UnixNano()), actual) - assert.Equal(t, testm.Type(), telegraf.Gauge) } -func TestAddCounter(t *testing.T) { - a := accumulator{} - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} +func TestAccAddError(t *testing.T) { + errBuf := bytes.NewBuffer(nil) + log.SetOutput(errBuf) + defer log.SetOutput(os.Stderr) - a.AddCounter("acctest", + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) + + a.AddError(fmt.Errorf("foo")) + a.AddError(fmt.Errorf("bar")) + a.AddError(fmt.Errorf("baz")) + + errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) + assert.EqualValues(t, 3, a.errCount) + require.Len(t, errs, 4) // 4 because of trailing newline + assert.Contains(t, string(errs[0]), "TestPlugin") + assert.Contains(t, string(errs[0]), "foo") + assert.Contains(t, string(errs[1]), "TestPlugin") + assert.Contains(t, string(errs[1]), "bar") + assert.Contains(t, string(errs[2]), "TestPlugin") + assert.Contains(t, string(errs[2]), "baz") +} + +func TestAddNoIntervalWithPrecision(t *testing.T) { + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) + a.SetPrecision(0, time.Second) + + a.AddFields("acctest", map[string]interface{}{"value": float64(101)}, map[string]string{}) - a.AddCounter("acctest", + a.AddFields("acctest", map[string]interface{}{"value": float64(101)}, map[string]string{"acc": "test"}) - a.AddCounter("acctest", + a.AddFields("acctest", map[string]interface{}{"value": float64(101)}, map[string]string{"acc": "test"}, now) testm := <-a.metrics actual := testm.String() assert.Contains(t, actual, "acctest value=101") - assert.Equal(t, testm.Type(), telegraf.Counter) testm = <-a.metrics actual = testm.String() assert.Contains(t, actual, "acctest,acc=test value=101") - assert.Equal(t, testm.Type(), telegraf.Counter) testm = <-a.metrics actual = testm.String() assert.Equal(t, - fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)), + actual) +} + +func TestAddDisablePrecision(t *testing.T) { + now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) + + a.SetPrecision(time.Nanosecond, 0) + a.AddFields("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{}) + a.AddFields("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{"acc": "test"}) + a.AddFields("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{"acc": "test"}, now) + + testm := <-a.metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + + testm = <-a.metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)), actual) - assert.Equal(t, testm.Type(), telegraf.Counter) } func TestAddNoPrecisionWithInterval(t *testing.T) { - a := accumulator{} now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) a.SetPrecision(0, time.Second) a.AddFields("acctest", @@ -151,79 +193,11 @@ func TestAddNoPrecisionWithInterval(t *testing.T) { actual) } -func TestAddNoIntervalWithPrecision(t *testing.T) { - a := accumulator{} - now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - a.SetPrecision(time.Second, time.Millisecond) - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{}) - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}) - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest value=101") - - testm = <-a.metrics - actual = testm.String() - assert.Contains(t, actual, "acctest,acc=test value=101") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)), - actual) -} - -func TestAddDisablePrecision(t *testing.T) { - a := accumulator{} - now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - a.SetPrecision(time.Second, time.Millisecond) - a.DisablePrecision() - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{}) - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}) - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest value=101") - - testm = <-a.metrics - actual = testm.String() - assert.Contains(t, actual, "acctest,acc=test value=101") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)), - actual) -} - func TestDifferentPrecisions(t *testing.T) { - a := accumulator{} now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) a.SetPrecision(0, time.Second) a.AddFields("acctest", @@ -266,349 +240,100 @@ func TestDifferentPrecisions(t *testing.T) { actual) } -func TestAddDefaultTags(t *testing.T) { - a := accumulator{} - a.addDefaultTag("default", "tag") +func TestAddGauge(t *testing.T) { now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) - a.AddFields("acctest", + a.AddGauge("acctest", map[string]interface{}{"value": float64(101)}, map[string]string{}) - a.AddFields("acctest", + a.AddGauge("acctest", map[string]interface{}{"value": float64(101)}, map[string]string{"acc": "test"}) - a.AddFields("acctest", + a.AddGauge("acctest", map[string]interface{}{"value": float64(101)}, map[string]string{"acc": "test"}, now) - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest,default=tag value=101") - - testm = <-a.metrics - actual = testm.String() - assert.Contains(t, actual, "acctest,acc=test,default=tag value=101") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()), - actual) -} - -func TestAddFields(t *testing.T) { - a := accumulator{} - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - fields := map[string]interface{}{ - "usage": float64(99), - } - a.AddFields("acctest", fields, map[string]string{}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest usage=99") - - testm = <-a.metrics - actual = testm.String() - assert.Contains(t, actual, "acctest,acc=test usage=99") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test usage=99 %d", now.UnixNano()), - actual) -} - -// Test that all Inf fields get dropped, and not added to metrics channel -func TestAddInfFields(t *testing.T) { - inf := math.Inf(1) - ninf := math.Inf(-1) - - a := accumulator{} - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - fields := map[string]interface{}{ - "usage": inf, - "nusage": ninf, - } - a.AddFields("acctest", fields, map[string]string{}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) - - assert.Len(t, a.metrics, 0) - - // test that non-inf fields are kept and not dropped - fields["notinf"] = float64(100) - a.AddFields("acctest", fields, map[string]string{}) - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest notinf=100") -} - -// Test that nan fields are dropped and not added -func TestAddNaNFields(t *testing.T) { - nan := math.NaN() - - a := accumulator{} - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - fields := map[string]interface{}{ - "usage": nan, - } - a.AddFields("acctest", fields, map[string]string{}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) - - assert.Len(t, a.metrics, 0) - - // test that non-nan fields are kept and not dropped - fields["notnan"] = float64(100) - a.AddFields("acctest", fields, map[string]string{}) - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest notnan=100") -} - -func TestAddUint64Fields(t *testing.T) { - a := accumulator{} - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - fields := map[string]interface{}{ - "usage": uint64(99), - } - a.AddFields("acctest", fields, map[string]string{}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest usage=99i") - - testm = <-a.metrics - actual = testm.String() - assert.Contains(t, actual, "acctest,acc=test usage=99i") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test usage=99i %d", now.UnixNano()), - actual) -} - -func TestAddUint64Overflow(t *testing.T) { - a := accumulator{} - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - fields := map[string]interface{}{ - "usage": uint64(9223372036854775808), - } - a.AddFields("acctest", fields, map[string]string{}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}) - a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest usage=9223372036854775807i") - - testm = <-a.metrics - actual = testm.String() - assert.Contains(t, actual, "acctest,acc=test usage=9223372036854775807i") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test usage=9223372036854775807i %d", now.UnixNano()), - actual) -} - -func TestAddInts(t *testing.T) { - a := accumulator{} - a.addDefaultTag("default", "tag") - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - a.AddFields("acctest", - map[string]interface{}{"value": int(101)}, - map[string]string{}) - a.AddFields("acctest", - map[string]interface{}{"value": int32(101)}, - map[string]string{"acc": "test"}) - a.AddFields("acctest", - map[string]interface{}{"value": int64(101)}, - map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest,default=tag value=101i") - - testm = <-a.metrics - actual = testm.String() - assert.Contains(t, actual, "acctest,acc=test,default=tag value=101i") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test,default=tag value=101i %d", now.UnixNano()), - actual) -} - -func TestAddFloats(t *testing.T) { - a := accumulator{} - a.addDefaultTag("default", "tag") - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - a.AddFields("acctest", - map[string]interface{}{"value": float32(101)}, - map[string]string{"acc": "test"}) - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest,acc=test,default=tag value=101") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()), - actual) -} - -func TestAddStrings(t *testing.T) { - a := accumulator{} - a.addDefaultTag("default", "tag") - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - a.AddFields("acctest", - map[string]interface{}{"value": "test"}, - map[string]string{"acc": "test"}) - a.AddFields("acctest", - map[string]interface{}{"value": "foo"}, - map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest,acc=test,default=tag value=\"test\"") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test,default=tag value=\"foo\" %d", now.UnixNano()), - actual) -} - -func TestAddBools(t *testing.T) { - a := accumulator{} - a.addDefaultTag("default", "tag") - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - a.inputConfig = &models.InputConfig{} - - a.AddFields("acctest", - map[string]interface{}{"value": true}, map[string]string{"acc": "test"}) - a.AddFields("acctest", - map[string]interface{}{"value": false}, map[string]string{"acc": "test"}, now) - - testm := <-a.metrics - actual := testm.String() - assert.Contains(t, actual, "acctest,acc=test,default=tag value=true") - - testm = <-a.metrics - actual = testm.String() - assert.Equal(t, - fmt.Sprintf("acctest,acc=test,default=tag value=false %d", now.UnixNano()), - actual) -} - -// Test that tag filters get applied to metrics. -func TestAccFilterTags(t *testing.T) { - a := accumulator{} - now := time.Now() - a.metrics = make(chan telegraf.Metric, 10) - defer close(a.metrics) - filter := models.Filter{ - TagExclude: []string{"acc"}, - } - assert.NoError(t, filter.Compile()) - a.inputConfig = &models.InputConfig{} - a.inputConfig.Filter = filter - - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{}) - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}) - a.AddFields("acctest", - map[string]interface{}{"value": float64(101)}, - map[string]string{"acc": "test"}, now) - - testm := <-a.metrics + testm := <-metrics actual := testm.String() assert.Contains(t, actual, "acctest value=101") + assert.Equal(t, testm.Type(), telegraf.Gauge) - testm = <-a.metrics + testm = <-metrics actual = testm.String() - assert.Contains(t, actual, "acctest value=101") + assert.Contains(t, actual, "acctest,acc=test value=101") + assert.Equal(t, testm.Type(), telegraf.Gauge) - testm = <-a.metrics + testm = <-metrics actual = testm.String() assert.Equal(t, - fmt.Sprintf("acctest value=101 %d", now.UnixNano()), + fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), actual) + assert.Equal(t, testm.Type(), telegraf.Gauge) } -func TestAccAddError(t *testing.T) { - errBuf := bytes.NewBuffer(nil) - log.SetOutput(errBuf) - defer log.SetOutput(os.Stderr) +func TestAddCounter(t *testing.T) { + now := time.Now() + metrics := make(chan telegraf.Metric, 10) + defer close(metrics) + a := NewAccumulator(&TestMetricMaker{}, metrics) - a := accumulator{} - a.inputConfig = &models.InputConfig{} - a.inputConfig.Name = "mock_plugin" + a.AddCounter("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{}) + a.AddCounter("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{"acc": "test"}) + a.AddCounter("acctest", + map[string]interface{}{"value": float64(101)}, + map[string]string{"acc": "test"}, now) - a.AddError(fmt.Errorf("foo")) - a.AddError(fmt.Errorf("bar")) - a.AddError(fmt.Errorf("baz")) + testm := <-metrics + actual := testm.String() + assert.Contains(t, actual, "acctest value=101") + assert.Equal(t, testm.Type(), telegraf.Counter) - errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) - assert.EqualValues(t, 3, a.errCount) - require.Len(t, errs, 4) // 4 because of trailing newline - assert.Contains(t, string(errs[0]), "mock_plugin") - assert.Contains(t, string(errs[0]), "foo") - assert.Contains(t, string(errs[1]), "mock_plugin") - assert.Contains(t, string(errs[1]), "bar") - assert.Contains(t, string(errs[2]), "mock_plugin") - assert.Contains(t, string(errs[2]), "baz") + testm = <-metrics + actual = testm.String() + assert.Contains(t, actual, "acctest,acc=test value=101") + assert.Equal(t, testm.Type(), telegraf.Counter) + + testm = <-metrics + actual = testm.String() + assert.Equal(t, + fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()), + actual) + assert.Equal(t, testm.Type(), telegraf.Counter) +} + +type TestMetricMaker struct { +} + +func (tm *TestMetricMaker) Name() string { + return "TestPlugin" +} +func (tm *TestMetricMaker) MakeMetric( + measurement string, + fields map[string]interface{}, + tags map[string]string, + mType telegraf.ValueType, + t time.Time, +) telegraf.Metric { + switch mType { + case telegraf.Untyped: + if m, err := telegraf.NewMetric(measurement, tags, fields, t); err == nil { + return m + } + case telegraf.Counter: + if m, err := telegraf.NewCounterMetric(measurement, tags, fields, t); err == nil { + return m + } + case telegraf.Gauge: + if m, err := telegraf.NewGaugeMetric(measurement, tags, fields, t); err == nil { + return m + } + } + return nil } diff --git a/agent/agent.go b/agent/agent.go index 8fef8ca41..a912126ba 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -89,7 +89,7 @@ func panicRecover(input *models.RunningInput) { trace := make([]byte, 2048) runtime.Stack(trace, true) log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n", - input.Name, err, trace) + input.Name(), err, trace) log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " + "stack trace, configuration, and OS information: " + "https://github.com/influxdata/telegraf/issues/new") @@ -103,19 +103,18 @@ func (a *Agent) gatherer( input *models.RunningInput, interval time.Duration, metricC chan telegraf.Metric, -) error { +) { defer panicRecover(input) ticker := time.NewTicker(interval) defer ticker.Stop() for { - var outerr error - - acc := NewAccumulator(input.Config, metricC) + acc := NewAccumulator(input, metricC) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) - acc.setDefaultTags(a.Config.Tags) + input.SetDebug(a.Config.Agent.Debug) + input.SetDefaultTags(a.Config.Tags) internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) @@ -123,15 +122,13 @@ func (a *Agent) gatherer( gatherWithTimeout(shutdown, input, acc, interval) elapsed := time.Since(start) - if outerr != nil { - return outerr - } + log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n", - input.Name, interval, elapsed) + input.Name(), interval, elapsed) select { case <-shutdown: - return nil + return case <-ticker.C: continue } @@ -160,13 +157,13 @@ func gatherWithTimeout( select { case err := <-done: if err != nil { - log.Printf("E! ERROR in input [%s]: %s", input.Name, err) + log.Printf("E! ERROR in input [%s]: %s", input.Name(), err) } return case <-ticker.C: log.Printf("E! ERROR: input [%s] took longer to collect than "+ "collection interval (%s)", - input.Name, timeout) + input.Name(), timeout) continue case <-shutdown: return @@ -194,13 +191,13 @@ func (a *Agent) Test() error { }() for _, input := range a.Config.Inputs { - acc := NewAccumulator(input.Config, metricC) - acc.SetTrace(true) + acc := NewAccumulator(input, metricC) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) - acc.setDefaultTags(a.Config.Tags) + input.SetTrace(true) + input.SetDefaultTags(a.Config.Tags) - fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) + fmt.Printf("* Plugin: %s, Collection 1\n", input.Name()) if input.Config.Interval != 0 { fmt.Printf("* Internal: %s\n", input.Config.Interval) } @@ -214,10 +211,10 @@ func (a *Agent) Test() error { // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. - switch input.Name { + switch input.Name() { case "cpu", "mongodb", "procstat": time.Sleep(500 * time.Millisecond) - fmt.Printf("* Plugin: %s, Collection 2\n", input.Name) + fmt.Printf("* Plugin: %s, Collection 2\n", input.Name()) if err := input.Input.Gather(acc); err != nil { return err } @@ -250,26 +247,69 @@ func (a *Agent) flush() { func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. - time.Sleep(time.Millisecond * 200) + time.Sleep(time.Millisecond * 300) + + // create an output metric channel and a gorouting that continously passes + // each metric onto the output plugins & aggregators. + outMetricC := make(chan telegraf.Metric, 100) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-shutdown: + for _, agg := range a.Config.Aggregators { + agg.Aggregator.Stop() + } + if len(outMetricC) > 0 { + // keep going until outMetricC is flushed + continue + } + return + case m := <-outMetricC: + // if dropOriginal is set to true, then we will only send this + // metric to the aggregators, not the outputs. + var dropOriginal bool + if !m.IsAggregate() { + for _, agg := range a.Config.Aggregators { + if ok := agg.Apply(copyMetric(m)); ok { + dropOriginal = true + } + } + } + if !dropOriginal { + for i, o := range a.Config.Outputs { + if i == len(a.Config.Outputs)-1 { + o.AddMetric(m) + } else { + o.AddMetric(copyMetric(m)) + } + } + } + } + } + }() ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) - for { select { case <-shutdown: log.Println("I! Hang on, flushing any cached metrics before shutdown") + // wait for outMetricC to get flushed before flushing outputs + wg.Wait() a.flush() return nil case <-ticker.C: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() - case m := <-metricC: - for i, o := range a.Config.Outputs { - if i == len(a.Config.Outputs)-1 { - o.AddMetric(m) - } else { - o.AddMetric(copyMetric(m)) + case metric := <-metricC: + mS := []telegraf.Metric{metric} + for _, processor := range a.Config.Processors { + mS = processor.Apply(mS...) } + for _, m := range mS { + outMetricC <- m } } } @@ -303,18 +343,18 @@ func (a *Agent) Run(shutdown chan struct{}) error { // channel shared between all input threads for accumulating metrics metricC := make(chan telegraf.Metric, 10000) + // Start all ServicePlugins for _, input := range a.Config.Inputs { - // Start service of any ServicePlugins switch p := input.Input.(type) { case telegraf.ServiceInput: - acc := NewAccumulator(input.Config, metricC) + acc := NewAccumulator(input, metricC) // Service input plugins should set their own precision of their // metrics. - acc.DisablePrecision() - acc.setDefaultTags(a.Config.Tags) + acc.SetPrecision(time.Nanosecond, 0) + input.SetDefaultTags(a.Config.Tags) if err := p.Start(acc); err != nil { log.Printf("E! Service for input %s failed to start, exiting\n%s\n", - input.Name, err.Error()) + input.Name(), err.Error()) return err } defer p.Stop() @@ -327,6 +367,18 @@ func (a *Agent) Run(shutdown chan struct{}) error { time.Sleep(time.Duration(i - (time.Now().UnixNano() % i))) } + // Start all Aggregators + for _, aggregator := range a.Config.Aggregators { + acc := NewAccumulator(aggregator, metricC) + acc.SetPrecision(a.Config.Agent.Precision.Duration, + a.Config.Agent.Interval.Duration) + if err := aggregator.Aggregator.Start(acc); err != nil { + log.Printf("[%s] failed to start, exiting\n%s\n", + aggregator.Name(), err.Error()) + return err + } + } + wg.Add(1) go func() { defer wg.Done() diff --git a/aggregator.go b/aggregator.go new file mode 100644 index 000000000..2a881c3a4 --- /dev/null +++ b/aggregator.go @@ -0,0 +1,16 @@ +package telegraf + +type Aggregator interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Apply the metric to the aggregator + Apply(in Metric) + + // Start starts the service filter with the given accumulator + Start(acc Accumulator) error + Stop() +} diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index dd8d8431b..cf6d7158d 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -12,12 +12,13 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/aggregators/all" "github.com/influxdata/telegraf/logger" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" _ "github.com/influxdata/telegraf/plugins/outputs/all" - + _ "github.com/influxdata/telegraf/plugins/processors/all" "github.com/kardianos/service" ) @@ -111,6 +112,8 @@ Examples: telegraf -config telegraf.conf -input-filter cpu:mem -output-filter influxdb ` +var logger service.Logger + var stop chan struct{} var srvc service.Service @@ -306,6 +309,10 @@ func main() { if err != nil { log.Fatal(err) } + logger, err = s.Logger(nil) + if err != nil { + log.Fatal(err) + } // Handle the -service flag here to prevent any issues with tooling that // may not have an interactive session, e.g. installing from Ansible. if *fService != "" { diff --git a/internal/config/config.go b/internal/config/config.go index b76c9b520..3dc2c02ee 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,15 +11,18 @@ import ( "regexp" "runtime" "sort" + "strconv" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/aggregators" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/config" @@ -47,9 +50,11 @@ type Config struct { InputFilters []string OutputFilters []string - Agent *AgentConfig - Inputs []*models.RunningInput - Outputs []*models.RunningOutput + Agent *AgentConfig + Inputs []*models.RunningInput + Outputs []*models.RunningOutput + Processors []*models.RunningProcessor + Aggregators []*models.RunningAggregator } func NewConfig() *Config { @@ -64,6 +69,7 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*models.RunningInput, 0), Outputs: make([]*models.RunningOutput, 0), + Processors: make([]*models.RunningProcessor, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -138,7 +144,7 @@ type AgentConfig struct { func (c *Config) InputNames() []string { var name []string for _, input := range c.Inputs { - name = append(name, input.Name) + name = append(name, input.Name()) } return name } @@ -248,6 +254,20 @@ var header = `# Telegraf Configuration ############################################################################### ` +var processorHeader = ` + +############################################################################### +# PROCESSOR PLUGINS # +############################################################################### +` + +var aggregatorHeader = ` + +############################################################################### +# AGGREGATOR PLUGINS # +############################################################################### +` + var inputHeader = ` ############################################################################### @@ -266,6 +286,7 @@ var serviceInputHeader = ` func PrintSampleConfig(inputFilters []string, outputFilters []string) { fmt.Printf(header) + // print output plugins if len(outputFilters) != 0 { printFilteredOutputs(outputFilters, false) } else { @@ -281,6 +302,25 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) { printFilteredOutputs(pnames, true) } + // print processor plugins + fmt.Printf(processorHeader) + pnames := []string{} + for pname := range processors.Processors { + pnames = append(pnames, pname) + } + sort.Strings(pnames) + printFilteredProcessors(pnames, true) + + // pring aggregator plugins + fmt.Printf(aggregatorHeader) + pnames = []string{} + for pname := range aggregators.Aggregators { + pnames = append(pnames, pname) + } + sort.Strings(pnames) + printFilteredAggregators(pnames, true) + + // print input plugins fmt.Printf(inputHeader) if len(inputFilters) != 0 { printFilteredInputs(inputFilters, false) @@ -298,6 +338,42 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) { } } +func printFilteredProcessors(processorFilters []string, commented bool) { + // Filter processors + var pnames []string + for pname := range processors.Processors { + if sliceContains(pname, processorFilters) { + pnames = append(pnames, pname) + } + } + sort.Strings(pnames) + + // Print Outputs + for _, pname := range pnames { + creator := processors.Processors[pname] + output := creator() + printConfig(pname, output, "processors", commented) + } +} + +func printFilteredAggregators(aggregatorFilters []string, commented bool) { + // Filter outputs + var anames []string + for aname := range aggregators.Aggregators { + if sliceContains(aname, aggregatorFilters) { + anames = append(anames, aname) + } + } + sort.Strings(anames) + + // Print Outputs + for _, aname := range anames { + creator := aggregators.Aggregators[aname] + output := creator() + printConfig(aname, output, "aggregators", commented) + } +} + func printFilteredInputs(inputFilters []string, commented bool) { // Filter inputs var pnames []string @@ -507,6 +583,7 @@ func (c *Config) LoadConfig(path string) error { case "outputs": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { + // legacy [outputs.influxdb] support case *ast.Table: if err = c.addOutput(pluginName, pluginSubTable); err != nil { return fmt.Errorf("Error parsing %s, %s", path, err) @@ -525,6 +602,7 @@ func (c *Config) LoadConfig(path string) error { case "inputs", "plugins": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { + // legacy [inputs.cpu] support case *ast.Table: if err = c.addInput(pluginName, pluginSubTable); err != nil { return fmt.Errorf("Error parsing %s, %s", path, err) @@ -540,6 +618,34 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "processors": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addProcessor(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } + case "aggregators": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addAggregator(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -580,6 +686,57 @@ func parseFile(fpath string) (*ast.Table, error) { return toml.Parse(contents) } +func (c *Config) addAggregator(name string, table *ast.Table) error { + creator, ok := aggregators.Aggregators[name] + if !ok { + return fmt.Errorf("Undefined but requested aggregator: %s", name) + } + aggregator := creator() + + aggregatorConfig, err := buildAggregator(name, table) + if err != nil { + return err + } + + if err := config.UnmarshalTable(table, aggregator); err != nil { + return err + } + + rf := &models.RunningAggregator{ + Aggregator: aggregator, + Config: aggregatorConfig, + } + + c.Aggregators = append(c.Aggregators, rf) + return nil +} + +func (c *Config) addProcessor(name string, table *ast.Table) error { + creator, ok := processors.Processors[name] + if !ok { + return fmt.Errorf("Undefined but requested processor: %s", name) + } + processor := creator() + + processorConfig, err := buildProcessor(name, table) + if err != nil { + return err + } + + if err := config.UnmarshalTable(table, processor); err != nil { + return err + } + + rf := &models.RunningProcessor{ + Name: name, + Processor: processor, + Config: processorConfig, + } + + c.Processors = append(c.Processors, rf) + return nil +} + func (c *Config) addOutput(name string, table *ast.Table) error { if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) { return nil @@ -652,7 +809,6 @@ func (c *Config) addInput(name string, table *ast.Table) error { } rp := &models.RunningInput{ - Name: name, Input: input, Config: pluginConfig, } @@ -660,6 +816,93 @@ func (c *Config) addInput(name string, table *ast.Table) error { return nil } +// buildAggregator TODO doc +func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) { + conf := &models.AggregatorConfig{Name: name} + unsupportedFields := []string{"tagexclude", "taginclude"} + for _, field := range unsupportedFields { + if _, ok := tbl.Fields[field]; ok { + // TODO raise error because field is not supported + } + } + + if node, ok := tbl.Fields["drop_original"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if b, ok := kv.Value.(*ast.Boolean); ok { + var err error + conf.DropOriginal, err = strconv.ParseBool(b.Value) + if err != nil { + log.Printf("Error parsing boolean value for %s: %s\n", name, err) + } + } + } + } + + if node, ok := tbl.Fields["name_prefix"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + conf.MeasurementPrefix = str.Value + } + } + } + + if node, ok := tbl.Fields["name_suffix"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + conf.MeasurementSuffix = str.Value + } + } + } + + if node, ok := tbl.Fields["name_override"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + conf.NameOverride = str.Value + } + } + } + + conf.Tags = make(map[string]string) + if node, ok := tbl.Fields["tags"]; ok { + if subtbl, ok := node.(*ast.Table); ok { + if err := config.UnmarshalTable(subtbl, conf.Tags); err != nil { + log.Printf("Could not parse tags for input %s\n", name) + } + } + } + + delete(tbl.Fields, "drop_original") + delete(tbl.Fields, "name_prefix") + delete(tbl.Fields, "name_suffix") + delete(tbl.Fields, "name_override") + delete(tbl.Fields, "tags") + var err error + conf.Filter, err = buildFilter(tbl) + if err != nil { + return conf, err + } + return conf, nil +} + +// buildProcessor TODO doc +func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) { + conf := &models.ProcessorConfig{Name: name} + unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop", + "tagexclude", "taginclude"} + for _, field := range unsupportedFields { + if _, ok := tbl.Fields[field]; ok { + // TODO raise error because field is not supported + } + } + + var err error + conf.Filter, err = buildFilter(tbl) + if err != nil { + return conf, err + } + return conf, nil +} + // buildFilter builds a Filter // (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to // be inserted into the models.OutputConfig/models.InputConfig diff --git a/internal/models/filter.go b/internal/models/filter.go index b87c59501..8e080f4a8 100644 --- a/internal/models/filter.go +++ b/internal/models/filter.go @@ -96,7 +96,7 @@ func (f *Filter) Compile() error { // Apply applies the filter to the given measurement name, fields map, and // tags map. It will return false if the metric should be "filtered out", and // true if the metric should "pass". -// It will modify tags in-place if they need to be deleted. +// It will modify tags & fields in-place if they need to be deleted. func (f *Filter) Apply( measurement string, fields map[string]interface{}, diff --git a/internal/models/makemetric.go b/internal/models/makemetric.go new file mode 100644 index 000000000..b198109a7 --- /dev/null +++ b/internal/models/makemetric.go @@ -0,0 +1,150 @@ +package models + +import ( + "log" + "math" + "time" + + "github.com/influxdata/telegraf" +) + +// makemetric is used by both RunningAggregator & RunningInput +// to make metrics. +// nameOverride: override the name of the measurement being made. +// namePrefix: add this prefix to each measurement name. +// nameSuffix: add this suffix to each measurement name. +// pluginTags: these are tags that are specific to this plugin. +// daemonTags: these are daemon-wide global tags, and get applied after pluginTags. +// filter: this is a filter to apply to each metric being made. +// applyFilter: if false, the above filter is not applied to each metric. +// This is used by Aggregators, because aggregators use filters +// on incoming metrics instead of on created metrics. +func makemetric( + measurement string, + fields map[string]interface{}, + tags map[string]string, + nameOverride string, + namePrefix string, + nameSuffix string, + pluginTags map[string]string, + daemonTags map[string]string, + filter Filter, + applyFilter bool, + debug bool, + mType telegraf.ValueType, + t time.Time, +) telegraf.Metric { + if len(fields) == 0 || len(measurement) == 0 { + return nil + } + if tags == nil { + tags = make(map[string]string) + } + + // Override measurement name if set + if len(nameOverride) != 0 { + measurement = nameOverride + } + // Apply measurement prefix and suffix if set + if len(namePrefix) != 0 { + measurement = namePrefix + measurement + } + if len(nameSuffix) != 0 { + measurement = measurement + nameSuffix + } + + // Apply plugin-wide tags if set + for k, v := range pluginTags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + // Apply daemon-wide tags if set + for k, v := range daemonTags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + + // Apply the metric filter(s) + // for aggregators, the filter does not get applied when the metric is made. + // instead, the filter is applied to metric incoming into the plugin. + // ie, it gets applied in the RunningAggregator.Apply function. + if applyFilter { + if ok := filter.Apply(measurement, fields, tags); !ok { + return nil + } + } + + for k, v := range fields { + // Validate uint64 and float64 fields + // convert all int & uint types to int64 + switch val := v.(type) { + case uint: + fields[k] = int64(val) + continue + case uint8: + fields[k] = int64(val) + continue + case uint16: + fields[k] = int64(val) + continue + case uint32: + fields[k] = int64(val) + continue + case int: + fields[k] = int64(val) + continue + case int8: + fields[k] = int64(val) + continue + case int16: + fields[k] = int64(val) + continue + case int32: + fields[k] = int64(val) + continue + case uint64: + // InfluxDB does not support writing uint64 + if val < uint64(9223372036854775808) { + fields[k] = int64(val) + } else { + fields[k] = int64(9223372036854775807) + } + continue + case float32: + fields[k] = float64(val) + continue + case float64: + // NaNs are invalid values in influxdb, skip measurement + if math.IsNaN(val) || math.IsInf(val, 0) { + if debug { + log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ + "field, skipping", + measurement, k) + } + delete(fields, k) + continue + } + } + + fields[k] = v + } + + var m telegraf.Metric + var err error + switch mType { + case telegraf.Counter: + m, err = telegraf.NewCounterMetric(measurement, tags, fields, t) + case telegraf.Gauge: + m, err = telegraf.NewGaugeMetric(measurement, tags, fields, t) + default: + m, err = telegraf.NewMetric(measurement, tags, fields, t) + } + if err != nil { + log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) + return nil + } + + return m +} diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go new file mode 100644 index 000000000..d344565f2 --- /dev/null +++ b/internal/models/running_aggregator.go @@ -0,0 +1,79 @@ +package models + +import ( + "time" + + "github.com/influxdata/telegraf" +) + +type RunningAggregator struct { + Aggregator telegraf.Aggregator + Config *AggregatorConfig +} + +// AggregatorConfig containing configuration parameters for the running +// aggregator plugin. +type AggregatorConfig struct { + Name string + + DropOriginal bool + NameOverride string + MeasurementPrefix string + MeasurementSuffix string + Tags map[string]string + Filter Filter +} + +func (r *RunningAggregator) Name() string { + return "aggregators." + r.Config.Name +} + +func (r *RunningAggregator) MakeMetric( + measurement string, + fields map[string]interface{}, + tags map[string]string, + mType telegraf.ValueType, + t time.Time, +) telegraf.Metric { + m := makemetric( + measurement, + fields, + tags, + r.Config.NameOverride, + r.Config.MeasurementPrefix, + r.Config.MeasurementSuffix, + r.Config.Tags, + nil, + r.Config.Filter, + false, + false, + mType, + t, + ) + + m.SetAggregate(true) + + return m +} + +// Apply applies the given metric to the aggregator. +// Before applying to the plugin, it will run any defined filters on the metric. +// Apply returns true if the original metric should be dropped. +func (r *RunningAggregator) Apply(in telegraf.Metric) bool { + if r.Config.Filter.IsActive() { + // check if the aggregator should apply this metric + name := in.Name() + fields := in.Fields() + tags := in.Tags() + t := in.Time() + if ok := r.Config.Filter.Apply(name, fields, tags); !ok { + // aggregator should not apply this metric + return false + } + + in, _ = telegraf.NewMetric(name, tags, fields, t) + } + + r.Aggregator.Apply(in) + return r.Config.DropOriginal +} diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go new file mode 100644 index 000000000..d2f04e202 --- /dev/null +++ b/internal/models/running_aggregator_test.go @@ -0,0 +1,150 @@ +package models + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdata/telegraf" + + "github.com/stretchr/testify/assert" +) + +func TestApply(t *testing.T) { + a := &TestAggregator{} + ra := RunningAggregator{ + Config: &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{ + NamePass: []string{"*"}, + }, + }, + Aggregator: a, + } + assert.NoError(t, ra.Config.Filter.Compile()) + + m := ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now(), + ) + assert.False(t, ra.Apply(m)) + assert.Equal(t, int64(101), a.sum) +} + +func TestApplyDropOriginal(t *testing.T) { + ra := RunningAggregator{ + Config: &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{ + NamePass: []string{"RI*"}, + }, + DropOriginal: true, + }, + Aggregator: &TestAggregator{}, + } + assert.NoError(t, ra.Config.Filter.Compile()) + + m := ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now(), + ) + assert.True(t, ra.Apply(m)) + + // this metric name doesn't match the filter, so Apply will return false + m2 := ra.MakeMetric( + "foobar", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + time.Now(), + ) + assert.False(t, ra.Apply(m2)) +} + +// make an untyped, counter, & gauge metric +func TestMakeMetricA(t *testing.T) { + now := time.Now() + ra := RunningAggregator{ + Config: &AggregatorConfig{ + Name: "TestRunningAggregator", + }, + } + assert.Equal(t, "aggregators.TestRunningAggregator", ra.Name()) + + m := ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest value=101i %d", now.UnixNano()), + ) + assert.Equal( + t, + m.Type(), + telegraf.Untyped, + ) + + m = ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Counter, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest value=101i %d", now.UnixNano()), + ) + assert.Equal( + t, + m.Type(), + telegraf.Counter, + ) + + m = ra.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Gauge, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest value=101i %d", now.UnixNano()), + ) + assert.Equal( + t, + m.Type(), + telegraf.Gauge, + ) +} + +type TestAggregator struct { + sum int64 +} + +func (t *TestAggregator) Description() string { return "" } +func (t *TestAggregator) SampleConfig() string { return "" } +func (t *TestAggregator) Start(acc telegraf.Accumulator) error { return nil } +func (t *TestAggregator) Stop() {} + +func (t *TestAggregator) Apply(in telegraf.Metric) { + for _, v := range in.Fields() { + if vi, ok := v.(int64); ok { + t.sum += vi + } + } +} diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 445c5ee96..558af3e5c 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -1,15 +1,19 @@ package models import ( + "fmt" "time" "github.com/influxdata/telegraf" ) type RunningInput struct { - Name string Input telegraf.Input Config *InputConfig + + trace bool + debug bool + defaultTags map[string]string } // InputConfig containing a name, interval, and filter @@ -22,3 +26,59 @@ type InputConfig struct { Filter Filter Interval time.Duration } + +func (r *RunningInput) Name() string { + return "inputs." + r.Config.Name +} + +// MakeMetric either returns a metric, or returns nil if the metric doesn't +// need to be created (because of filtering, an error, etc.) +func (r *RunningInput) MakeMetric( + measurement string, + fields map[string]interface{}, + tags map[string]string, + mType telegraf.ValueType, + t time.Time, +) telegraf.Metric { + m := makemetric( + measurement, + fields, + tags, + r.Config.NameOverride, + r.Config.MeasurementPrefix, + r.Config.MeasurementSuffix, + r.Config.Tags, + r.defaultTags, + r.Config.Filter, + true, + r.debug, + mType, + t, + ) + + if r.trace && m != nil { + fmt.Println("> " + m.String()) + } + + return m +} + +func (r *RunningInput) Debug() bool { + return r.debug +} + +func (r *RunningInput) SetDebug(debug bool) { + r.debug = debug +} + +func (r *RunningInput) Trace() bool { + return r.trace +} + +func (r *RunningInput) SetTrace(trace bool) { + r.trace = trace +} + +func (r *RunningInput) SetDefaultTags(tags map[string]string) { + r.defaultTags = tags +} diff --git a/internal/models/running_input_test.go b/internal/models/running_input_test.go new file mode 100644 index 000000000..12283057d --- /dev/null +++ b/internal/models/running_input_test.go @@ -0,0 +1,326 @@ +package models + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/influxdata/telegraf" + + "github.com/stretchr/testify/assert" +) + +func TestMakeMetricNoFields(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + }, + } + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{}, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Nil(t, m) +} + +// make an untyped, counter, & gauge metric +func TestMakeMetric(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + }, + } + ri.SetDebug(true) + assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) + assert.Equal(t, true, ri.Trace()) + assert.Equal(t, "inputs.TestRunningInput", ri.Name()) + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest value=101i %d", now.UnixNano()), + ) + assert.Equal( + t, + m.Type(), + telegraf.Untyped, + ) + + m = ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Counter, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest value=101i %d", now.UnixNano()), + ) + assert.Equal( + t, + m.Type(), + telegraf.Counter, + ) + + m = ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Gauge, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest value=101i %d", now.UnixNano()), + ) + assert.Equal( + t, + m.Type(), + telegraf.Gauge, + ) +} + +func TestMakeMetricWithPluginTags(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", + }, + }, + } + ri.SetDebug(true) + assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) + assert.Equal(t, true, ri.Trace()) + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + nil, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest,foo=bar value=101i %d", now.UnixNano()), + ) +} + +func TestMakeMetricFilteredOut(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + Tags: map[string]string{ + "foo": "bar", + }, + Filter: Filter{NamePass: []string{"foobar"}}, + }, + } + ri.SetDebug(true) + assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) + assert.Equal(t, true, ri.Trace()) + assert.NoError(t, ri.Config.Filter.Compile()) + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + nil, + telegraf.Untyped, + now, + ) + assert.Nil(t, m) +} + +func TestMakeMetricWithDaemonTags(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + }, + } + ri.SetDefaultTags(map[string]string{ + "foo": "bar", + }) + ri.SetDebug(true) + assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) + assert.Equal(t, true, ri.Trace()) + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest,foo=bar value=101i %d", now.UnixNano()), + ) +} + +// make an untyped, counter, & gauge metric +func TestMakeMetricInfFields(t *testing.T) { + inf := math.Inf(1) + ninf := math.Inf(-1) + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + }, + } + ri.SetDebug(true) + assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) + assert.Equal(t, true, ri.Trace()) + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{ + "value": int(101), + "inf": inf, + "ninf": ninf, + }, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest value=101i %d", now.UnixNano()), + ) +} + +func TestMakeMetricAllFieldTypes(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + }, + } + ri.SetDebug(true) + assert.Equal(t, true, ri.Debug()) + ri.SetTrace(true) + assert.Equal(t, true, ri.Trace()) + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{ + "a": int(10), + "b": int8(10), + "c": int16(10), + "d": int32(10), + "e": uint(10), + "f": uint8(10), + "g": uint16(10), + "h": uint32(10), + "i": uint64(10), + "j": float32(10), + "k": uint64(9223372036854775810), + "l": "foobar", + "m": true, + }, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + fmt.Sprintf("RITest a=10i,b=10i,c=10i,d=10i,e=10i,f=10i,g=10i,h=10i,i=10i,j=10,k=9223372036854775807i,l=\"foobar\",m=true %d", now.UnixNano()), + m.String(), + ) +} + +func TestMakeMetricNameOverride(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + NameOverride: "foobar", + }, + } + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("foobar value=101i %d", now.UnixNano()), + ) +} + +func TestMakeMetricNamePrefix(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + MeasurementPrefix: "foobar_", + }, + } + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("foobar_RITest value=101i %d", now.UnixNano()), + ) +} + +func TestMakeMetricNameSuffix(t *testing.T) { + now := time.Now() + ri := RunningInput{ + Config: &InputConfig{ + Name: "TestRunningInput", + MeasurementSuffix: "_foobar", + }, + } + + m := ri.MakeMetric( + "RITest", + map[string]interface{}{"value": int(101)}, + map[string]string{}, + telegraf.Untyped, + now, + ) + assert.Equal( + t, + m.String(), + fmt.Sprintf("RITest_foobar value=101i %d", now.UnixNano()), + ) +} diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index a42d6fc7e..2bca79a06 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -132,7 +132,6 @@ func TestRunningOutput_PassFilter(t *testing.T) { func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagInclude: []string{"nothing*"}, }, } @@ -154,7 +153,6 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { func TestRunningOutput_TagExcludeMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagExclude: []string{"tag*"}, }, } @@ -176,7 +174,6 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) { func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagExclude: []string{"nothing*"}, }, } @@ -198,7 +195,6 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { func TestRunningOutput_TagIncludeMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagInclude: []string{"tag*"}, }, } diff --git a/internal/models/running_processor.go b/internal/models/running_processor.go new file mode 100644 index 000000000..f4f483f6d --- /dev/null +++ b/internal/models/running_processor.go @@ -0,0 +1,37 @@ +package models + +import ( + "github.com/influxdata/telegraf" +) + +type RunningProcessor struct { + Name string + Processor telegraf.Processor + Config *ProcessorConfig +} + +// FilterConfig containing a name and filter +type ProcessorConfig struct { + Name string + Filter Filter +} + +func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { + ret := []telegraf.Metric{} + + for _, metric := range in { + if rp.Config.Filter.IsActive() { + // check if the filter should be applied to this metric + if ok := rp.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok { + // this means filter should not be applied + ret = append(ret, metric) + continue + } + } + // This metric should pass through the filter, so call the filter Apply + // function and append results to the output slice. + ret = append(ret, rp.Processor.Apply(metric)...) + } + + return ret +} diff --git a/internal/models/running_processor_test.go b/internal/models/running_processor_test.go new file mode 100644 index 000000000..8a691a9b8 --- /dev/null +++ b/internal/models/running_processor_test.go @@ -0,0 +1,117 @@ +package models + +import ( + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +type TestProcessor struct { +} + +func (f *TestProcessor) SampleConfig() string { return "" } +func (f *TestProcessor) Description() string { return "" } + +// Apply renames: +// "foo" to "fuz" +// "bar" to "baz" +// And it also drops measurements named "dropme" +func (f *TestProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { + out := make([]telegraf.Metric, 0) + for _, m := range in { + switch m.Name() { + case "foo": + out = append(out, testutil.TestMetric(1, "fuz")) + case "bar": + out = append(out, testutil.TestMetric(1, "baz")) + case "dropme": + // drop the metric! + default: + out = append(out, m) + } + } + return out +} + +func NewTestRunningProcessor() *RunningProcessor { + out := &RunningProcessor{ + Name: "test", + Processor: &TestProcessor{}, + Config: &ProcessorConfig{Filter: Filter{}}, + } + return out +} + +func TestRunningProcessor(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + testutil.TestMetric(1, "baz"), + } + + expectedNames := []string{ + "fuz", + "baz", + "baz", + } + rfp := NewTestRunningProcessor() + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + filteredMetrics[2].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} + +func TestRunningProcessor_WithNameDrop(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + testutil.TestMetric(1, "baz"), + } + + expectedNames := []string{ + "foo", + "baz", + "baz", + } + rfp := NewTestRunningProcessor() + + rfp.Config.Filter.NameDrop = []string{"foo"} + assert.NoError(t, rfp.Config.Filter.Compile()) + + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + filteredMetrics[2].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} + +func TestRunningProcessor_DroppedMetric(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "dropme"), + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + } + + expectedNames := []string{ + "fuz", + "baz", + } + rfp := NewTestRunningProcessor() + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} diff --git a/metric.go b/metric.go index 937603cdc..9209de731 100644 --- a/metric.go +++ b/metric.go @@ -4,6 +4,7 @@ import ( "time" "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/influxdb/models" ) // ValueType is an enumeration of metric types that represent a simple value. @@ -33,6 +34,9 @@ type Metric interface { // UnixNano returns the unix nano time of the metric UnixNano() int64 + // HashID returns a non-cryptographic hash of the metric (name + tags) + HashID() uint64 + // Fields returns the fields for the metric Fields() map[string]interface{} @@ -44,13 +48,21 @@ type Metric interface { // Point returns a influxdb client.Point object Point() *client.Point + + // SetAggregate sets the metric's aggregate status + // This is so that aggregate metrics don't get re-sent to aggregator plugins + SetAggregate(bool) + // IsAggregate returns true if the metric is an aggregate + IsAggregate() bool } // metric is a wrapper of the influxdb client.Point struct type metric struct { - pt *client.Point + pt models.Point mType ValueType + + isaggregate bool } // NewMetric returns an untyped metric. @@ -60,7 +72,7 @@ func NewMetric( fields map[string]interface{}, t time.Time, ) (Metric, error) { - pt, err := client.NewPoint(name, tags, fields, t) + pt, err := models.NewPoint(name, tags, fields, t) if err != nil { return nil, err } @@ -79,7 +91,7 @@ func NewGaugeMetric( fields map[string]interface{}, t time.Time, ) (Metric, error) { - pt, err := client.NewPoint(name, tags, fields, t) + pt, err := models.NewPoint(name, tags, fields, t) if err != nil { return nil, err } @@ -98,7 +110,7 @@ func NewCounterMetric( fields map[string]interface{}, t time.Time, ) (Metric, error) { - pt, err := client.NewPoint(name, tags, fields, t) + pt, err := models.NewPoint(name, tags, fields, t) if err != nil { return nil, err } @@ -124,6 +136,10 @@ func (m *metric) Type() ValueType { return m.mType } +func (m *metric) HashID() uint64 { + return m.pt.HashID() +} + func (m *metric) UnixNano() int64 { return m.pt.UnixNano() } @@ -141,5 +157,13 @@ func (m *metric) PrecisionString(precison string) string { } func (m *metric) Point() *client.Point { - return m.pt + return client.NewPointFrom(m.pt) +} + +func (m *metric) IsAggregate() bool { + return m.isaggregate +} + +func (m *metric) SetAggregate(b bool) { + m.isaggregate = b } diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go new file mode 100644 index 000000000..1041a0c9c --- /dev/null +++ b/plugins/aggregators/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/aggregators/minmax" +) diff --git a/plugins/aggregators/minmax/minmax.go b/plugins/aggregators/minmax/minmax.go new file mode 100644 index 000000000..e628ad7ac --- /dev/null +++ b/plugins/aggregators/minmax/minmax.go @@ -0,0 +1,192 @@ +package minmax + +import ( + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +type MinMax struct { + Period internal.Duration + + // metrics waiting to be processed + metrics chan telegraf.Metric + shutdown chan struct{} + wg sync.WaitGroup + + // caches for metric fields, names, and tags + fieldCache map[uint64]map[string]minmax + nameCache map[uint64]string + tagCache map[uint64]map[string]string + + acc telegraf.Accumulator +} + +type minmax struct { + min interface{} + max interface{} +} + +var sampleConfig = ` + ## TODO doc + period = "30s" +` + +func (m *MinMax) SampleConfig() string { + return sampleConfig +} + +func (m *MinMax) Description() string { + return "Keep the aggregate min/max of each metric passing through." +} + +func (m *MinMax) Apply(in telegraf.Metric) { + m.metrics <- in +} + +func (m *MinMax) apply(in telegraf.Metric) { + id := in.HashID() + if _, ok := m.nameCache[id]; !ok { + // hit an uncached metric, create caches for first time: + m.nameCache[id] = in.Name() + m.tagCache[id] = in.Tags() + m.fieldCache[id] = make(map[string]minmax) + for k, v := range in.Fields() { + m.fieldCache[id][k] = minmax{ + min: v, + max: v, + } + } + } else { + for k, v := range in.Fields() { + cmpmin := compare(m.fieldCache[id][k].min, v) + cmpmax := compare(m.fieldCache[id][k].max, v) + if cmpmin == 1 { + tmp := m.fieldCache[id][k] + tmp.min = v + m.fieldCache[id][k] = tmp + } + if cmpmax == -1 { + tmp := m.fieldCache[id][k] + tmp.max = v + m.fieldCache[id][k] = tmp + } + } + } +} + +func (m *MinMax) Start(acc telegraf.Accumulator) error { + m.metrics = make(chan telegraf.Metric, 10) + m.shutdown = make(chan struct{}) + m.clearCache() + m.acc = acc + m.wg.Add(1) + if m.Period.Duration > 0 { + go m.periodHandler() + } else { + go m.continuousHandler() + } + return nil +} + +func (m *MinMax) Stop() { + close(m.shutdown) + m.wg.Wait() +} + +func (m *MinMax) addfields(id uint64) { + fields := map[string]interface{}{} + for k, v := range m.fieldCache[id] { + fields[k+"_min"] = v.min + fields[k+"_max"] = v.max + } + m.acc.AddFields(m.nameCache[id], fields, m.tagCache[id]) +} + +func (m *MinMax) clearCache() { + m.fieldCache = make(map[uint64]map[string]minmax) + m.nameCache = make(map[uint64]string) + m.tagCache = make(map[uint64]map[string]string) +} + +// periodHandler only adds the aggregate metrics on the configured Period. +// thus if telegraf's collection interval is 10s, and period is 30s, there +// will only be one aggregate sent every 3 metrics. +func (m *MinMax) periodHandler() { + // TODO make this sleep less of a hack! + time.Sleep(time.Millisecond * 200) + defer m.wg.Done() + ticker := time.NewTicker(m.Period.Duration) + defer ticker.Stop() + for { + select { + case in := <-m.metrics: + m.apply(in) + case <-m.shutdown: + if len(m.metrics) > 0 { + continue + } + return + case <-ticker.C: + for id, _ := range m.nameCache { + m.addfields(id) + } + m.clearCache() + } + } +} + +// continuousHandler sends one metric for every metric that passes through it. +func (m *MinMax) continuousHandler() { + defer m.wg.Done() + for { + select { + case in := <-m.metrics: + m.apply(in) + m.addfields(in.HashID()) + case <-m.shutdown: + if len(m.metrics) > 0 { + continue + } + return + } + } +} + +func compare(a, b interface{}) int { + switch at := a.(type) { + case int64: + if bt, ok := b.(int64); ok { + if at < bt { + return -1 + } else if at > bt { + return 1 + } + return 0 + } else { + return 0 + } + case float64: + if bt, ok := b.(float64); ok { + if at < bt { + return -1 + } else if at > bt { + return 1 + } + return 0 + } else { + return 0 + } + default: + return 0 + } +} + +func init() { + aggregators.Add("minmax", func() telegraf.Aggregator { + return &MinMax{} + }) +} diff --git a/plugins/aggregators/minmax/minmax_test.go b/plugins/aggregators/minmax/minmax_test.go new file mode 100644 index 000000000..5a854d91b --- /dev/null +++ b/plugins/aggregators/minmax/minmax_test.go @@ -0,0 +1,51 @@ +package minmax + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" +) + +func BenchmarkApply(b *testing.B) { + minmax := MinMax{} + minmax.clearCache() + + m1, _ := telegraf.NewMetric("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "a": int64(1), + "b": int64(1), + "c": int64(1), + "d": int64(1), + "e": int64(1), + "f": float64(2), + "g": float64(2), + "h": float64(2), + "i": float64(2), + "j": float64(3), + }, + time.Now(), + ) + m2, _ := telegraf.NewMetric("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "a": int64(3), + "b": int64(3), + "c": int64(3), + "d": int64(3), + "e": int64(3), + "f": float64(1), + "g": float64(1), + "h": float64(1), + "i": float64(1), + "j": float64(1), + }, + time.Now(), + ) + + for n := 0; n < b.N; n++ { + minmax.apply(m1) + minmax.apply(m2) + } +} diff --git a/plugins/aggregators/registry.go b/plugins/aggregators/registry.go new file mode 100644 index 000000000..77a9c9a64 --- /dev/null +++ b/plugins/aggregators/registry.go @@ -0,0 +1,11 @@ +package aggregators + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.Aggregator + +var Aggregators = map[string]Creator{} + +func Add(name string, creator Creator) { + Aggregators[name] = creator +} diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index ed04cf860..267ba56a1 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -99,14 +99,14 @@ func TestWriteHTTPHighTraffic(t *testing.T) { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) - go func() { + go func(innerwg *sync.WaitGroup) { + defer innerwg.Done() for i := 0; i < 500; i++ { resp, err := http.Post("http://localhost:8286/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) } - wg.Done() - }() + }(&wg) } wg.Wait() diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index 199262c0b..beb010fce 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -29,6 +29,7 @@ type Postgresql struct { Tagvalue string Measurement string } + Debug bool } type query []struct { diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go new file mode 100644 index 000000000..462298f6b --- /dev/null +++ b/plugins/processors/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/processors/printer" +) diff --git a/plugins/processors/printer/printer.go b/plugins/processors/printer/printer.go new file mode 100644 index 000000000..a65a104e6 --- /dev/null +++ b/plugins/processors/printer/printer.go @@ -0,0 +1,35 @@ +package printer + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Printer struct { +} + +var sampleConfig = ` +` + +func (p *Printer) SampleConfig() string { + return sampleConfig +} + +func (p *Printer) Description() string { + return "Print all metrics that pass through this filter." +} + +func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + fmt.Println(metric.String()) + } + return in +} + +func init() { + processors.Add("printer", func() telegraf.Processor { + return &Printer{} + }) +} diff --git a/plugins/processors/printer/printer_test.go b/plugins/processors/printer/printer_test.go new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/processors/registry.go b/plugins/processors/registry.go new file mode 100644 index 000000000..592c688f3 --- /dev/null +++ b/plugins/processors/registry.go @@ -0,0 +1,11 @@ +package processors + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.Processor + +var Processors = map[string]Creator{} + +func Add(name string, creator Creator) { + Processors[name] = creator +} diff --git a/processor.go b/processor.go new file mode 100644 index 000000000..f2b5133a5 --- /dev/null +++ b/processor.go @@ -0,0 +1,12 @@ +package telegraf + +type Processor interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Apply the filter to the given metric + Apply(in ...Metric) []Metric +}